Урок 29: Kafka — DLQ, ретраи, consumer groups, ребалансировка

Урок 29. Kafka — DLQ, ретраи, consumer groups, ребалансировка

🔄 Node.js → Go (ключевые аналоги):

📋 Что изучаем

📦 Инициализация проекта

mkdir go-kafka-advanced && cd go-kafka-advanced
go mod init go-kafka-advanced

go get github.com/segmentio/kafka-go
go get github.com/google/uuid
go mod tidy

# Структура
mkdir -p internal/kafka
mkdir -p cmd/consumer
mkdir -p cmd/producer

💻 Файл: internal/kafka/retry_dlq.go

package kafka

import (
    "context"
    "encoding/json"
    "fmt"
    "log"
    "math"
    "time"

    "github.com/segmentio/kafka-go"
)

// ╔══════════════════════════════════════════════════════════╗
// ║  КОНФИГУРАЦИЯ РЕТРАЕВ                                  ║
// ╚══════════════════════════════════════════════════════════╝

// RetryConfig — конфигурация повторных попыток
type RetryConfig struct {
    MaxRetries int           // Максимальное количество попыток (включая первую)
    InitialBackoff time.Duration // Начальная задержка
    MaxBackoff     time.Duration // Максимальная задержка
    Multiplier     float64       // Множитель для exponential backoff
}

// DefaultRetryConfig — разумные значения по умолчанию
func DefaultRetryConfig() RetryConfig {
    return RetryConfig{
        MaxRetries:     3,
        InitialBackoff: 1 * time.Second,
        MaxBackoff:     60 * time.Second,
        Multiplier:     2.0,
    }
}

// ╔══════════════════════════════════════════════════════════╗
// ║  ЗАГОЛОВКИ ДЛЯ РЕТРАЕВ И DLQ                          ║
// ╚══════════════════════════════════════════════════════════╝

const (
    HeaderRetryCount    = "x-retry-count"
    HeaderOriginalTopic = "x-original-topic"
    HeaderOriginalID    = "x-original-id"
    HeaderDeadLetter    = "x-dead-letter"
    HeaderErrorReason   = "x-error-reason"
)

// ╔══════════════════════════════════════════════════════════╗
// ║  RETRY HANDLER (обработчик с ретраями)                 ║
// ╚══════════════════════════════════════════════════════════╝

// RetryableConsumer — consumer с поддержкой ретраев и DLQ
type RetryableConsumer struct {
    reader      *kafka.Reader
    dlqWriter   *kafka.Writer // Пишет в DLQ
    retryWriter *kafka.Writer // Пишет в retry-топик
    handler     MessageHandler
    config      RetryConfig
}

// RetryableConsumerConfig — конфигурация
type RetryableConsumerConfig struct {
    Brokers      []string
    Topic        string
    GroupID      string
    DLQTopic     string // dead-letter-queue
    RetryTopic   string // топик для ретраев (может совпадать с основным)
    RetryCfg     RetryConfig
    Handler      MessageHandler
}

// NewRetryableConsumer — создаёт consumer с ретраями
func NewRetryableConsumer(cfg RetryableConsumerConfig) *RetryableConsumer {
    reader := kafka.NewReader(kafka.ReaderConfig{
        Brokers:       cfg.Brokers,
        Topic:         cfg.Topic,
        GroupID:       cfg.GroupID,
        MinBytes:      10e3,
        MaxBytes:      10e6,
        CommitInterval: -1, // Ручной коммит
    })

    dlqWriter := &kafka.Writer{
        Addr:     kafka.TCP(cfg.Brokers...),
        Topic:    cfg.DLQTopic,
        Balancer: &kafka.LeastBytes{},
    }

    retryWriter := &kafka.Writer{
        Addr:     kafka.TCP(cfg.Brokers...),
        Topic:    cfg.RetryTopic,
        Balancer: &kafka.LeastBytes{},
    }

    return &RetryableConsumer{
        reader:      reader,
        dlqWriter:   dlqWriter,
        retryWriter: retryWriter,
        handler:     cfg.Handler,
        config:      cfg.RetryCfg,
    }
}

// Start — запускает consumer с ретраями
func (c *RetryableConsumer) Start(ctx context.Context) error {
    log.Printf("📥 Consumer с ретраями запущен: topic=%s dlq=%s",
        c.reader.Config().Topic, c.dlqWriter.Topic)

    for {
        select {
        case <-ctx.Done():
            return ctx.Err()
        default:
        }

        msg, err := c.reader.FetchMessage(ctx)
        if err != nil {
            if err == context.Canceled || err == context.DeadlineExceeded {
                return nil
            }
            log.Printf("❌ Ошибка чтения: %v", err)
            time.Sleep(1 * time.Second)
            continue
        }

        // Обрабатываем с ретраями
        if err := c.processWithRetry(ctx, msg); err != nil {
            log.Printf("🗑️ Отправка в DLQ: %v", err)
            c.sendToDLQ(ctx, msg, err)
        }

        // Коммитим в любом случае (сообщение либо обработано, либо в DLQ)
        c.reader.CommitMessages(ctx, msg)
    }
}

// processWithRetry — обрабатывает сообщение с повторными попытками
func (c *RetryableConsumer) processWithRetry(ctx context.Context, msg kafka.Message) error {
    retryCount := getRetryCount(msg)

    for attempt := 0; attempt <= c.config.MaxRetries; attempt++ {
        // Декодируем
        event, err := decodeMessage(msg)
        if err != nil {
            return fmt.Errorf("decode error: %w", err)
        }

        // Обрабатываем
        err = c.handler(ctx, *event)

        if err == nil {
            // Успех!
            log.Printf("✅ Обработано (попытка %d/%d): %s",
                attempt+1, c.config.MaxRetries+1, event.ID)
            return nil
        }

        // Ошибка — будем ретраить
        if attempt < c.config.MaxRetries {
            backoff := c.calculateBackoff(attempt)
            log.Printf("⏳ Ретрай %d/%d через %v: %v",
                attempt+1, c.config.MaxRetries, backoff, err)

            // Отправляем в retry-топик с увеличенным счётчиком
            if err := c.sendToRetry(ctx, msg, attempt+1, err); err != nil {
                return fmt.Errorf("send to retry: %w", err)
            }
            return nil // Выходим — сообщение будет обработано снова из retry-топика
        }

        // Исчерпаны все попытки
        return fmt.Errorf("max retries exceeded (%d): %w", c.config.MaxRetries+1, err)
    }

    return nil
}

// sendToRetry — отправляет сообщение в retry-топик
func (c *RetryableConsumer) sendToRetry(ctx context.Context, msg kafka.Message, attempt int, originalErr error) error {
    retryMsg := kafka.Message{
        Key:   msg.Key,
        Value: msg.Value,
        Headers: []kafka.Header{
            {Key: HeaderRetryCount, Value: []byte(fmt.Sprintf("%d", attempt))},
            {Key: HeaderOriginalTopic, Value: []byte(msg.Topic)},
            {Key: HeaderErrorReason, Value: []byte(originalErr.Error())},
        },
        Time: time.Now(),
    }

    // Добавляем существующие заголовки
    retryMsg.Headers = append(retryMsg.Headers, msg.Headers...)

    if err := c.retryWriter.WriteMessages(ctx, retryMsg); err != nil {
        return fmt.Errorf("write retry message: %w", err)
    }

    log.Printf("📤 Сообщение отправлено в retry (попытка %d)", attempt)
    return nil
}

// sendToDLQ — отправляет сообщение в Dead Letter Queue
func (c *RetryableConsumer) sendToDLQ(ctx context.Context, msg kafka.Message, err error) {
    dlqMsg := kafka.Message{
        Key:   msg.Key,
        Value: msg.Value,
        Headers: []kafka.Header{
            {Key: HeaderDeadLetter, Value: []byte("true")},
            {Key: HeaderOriginalTopic, Value: []byte(msg.Topic)},
            {Key: HeaderErrorReason, Value: []byte(err.Error())},
        },
        Time: time.Now(),
    }

    dlqMsg.Headers = append(dlqMsg.Headers, msg.Headers...)

    if writeErr := c.dlqWriter.WriteMessages(ctx, dlqMsg); writeErr != nil {
        log.Printf("❌ КРИТИЧЕСКАЯ ОШИБКА: не удалось записать в DLQ: %v", writeErr)
        // В реальности — сохранить в файл/БД для ручного разбора
    } else {
        log.Printf("📤 Сообщение отправлено в DLQ")
    }
}

// calculateBackoff — вычисляет задержку для exponential backoff
func (c *RetryableConsumer) calculateBackoff(attempt int) time.Duration {
    backoff := float64(c.config.InitialBackoff) * math.Pow(c.config.Multiplier, float64(attempt))
    if backoff > float64(c.config.MaxBackoff) {
        backoff = float64(c.config.MaxBackoff)
    }
    return time.Duration(backoff)
}

// getRetryCount — извлекает счётчик ретраев из заголовков
func getRetryCount(msg kafka.Message) int {
    for _, h := range msg.Headers {
        if h.Key == HeaderRetryCount {
            var count int
            fmt.Sscanf(string(h.Value), "%d", &count)
            return count
        }
    }
    return 0
}

// Close — закрывает все соединения
func (c *RetryableConsumer) Close() error {
    var errs []error
    if err := c.reader.Close(); err != nil {
        errs = append(errs, err)
    }
    if err := c.dlqWriter.Close(); err != nil {
        errs = append(errs, err)
    }
    if err := c.retryWriter.Close(); err != nil {
        errs = append(errs, err)
    }
    if len(errs) > 0 {
        return fmt.Errorf("close errors: %v", errs)
    }
    return nil
}

💻 Файл: internal/kafka/batch_consumer.go

package kafka

import (
    "context"
    "log"
    "time"

    "github.com/segmentio/kafka-go"
)

// ╔══════════════════════════════════════════════════════════╗
// ║  BATCH CONSUMER (пакетная обработка)                   ║
// ╚══════════════════════════════════════════════════════════╝

// BatchHandler — обработчик ПАЧКИ сообщений
type BatchHandler func(ctx context.Context, messages []Message) error

// BatchConsumer — consumer с пакетной обработкой
type BatchConsumer struct {
    reader     *kafka.Reader
    handler    BatchHandler
    batchSize  int
    batchTimeout time.Duration
}

// BatchConsumerConfig — конфигурация
type BatchConsumerConfig struct {
    Brokers      []string
    Topic        string
    GroupID      string
    BatchSize    int           // Максимальный размер батча
    BatchTimeout time.Duration // Максимальное время ожидания батча
    Handler      BatchHandler
}

// NewBatchConsumer — создаёт batch consumer
func NewBatchConsumer(cfg BatchConsumerConfig) *BatchConsumer {
    reader := kafka.NewReader(kafka.ReaderConfig{
        Brokers:        cfg.Brokers,
        Topic:          cfg.Topic,
        GroupID:        cfg.GroupID,
        MinBytes:       10e3,
        MaxBytes:       10e6,
        CommitInterval: -1,
    })

    return &BatchConsumer{
        reader:       reader,
        handler:      cfg.Handler,
        batchSize:    cfg.BatchSize,
        batchTimeout: cfg.BatchTimeout,
    }
}

// Start — запускает пакетную обработку
func (c *BatchConsumer) Start(ctx context.Context) error {
    log.Printf("📥 Batch consumer: topic=%s batch_size=%d timeout=%v",
        c.reader.Config().Topic, c.batchSize, c.batchTimeout)

    batch := make([]kafka.Message, 0, c.batchSize)
    timer := time.NewTimer(c.batchTimeout)
    defer timer.Stop()

    for {
        select {
        case <-ctx.Done():
            c.commitBatch(ctx, batch)
            return ctx.Err()

        case <-timer.C:
            if len(batch) > 0 {
                c.processBatch(ctx, batch)
                batch = batch[:0]
            }
            timer.Reset(c.batchTimeout)

        default:
            msg, err := c.reader.FetchMessage(ctx)
            if err != nil {
                if err == context.Canceled || err == context.DeadlineExceeded {
                    c.processBatch(ctx, batch)
                    return nil
                }
                log.Printf("❌ Ошибка чтения: %v", err)
                time.Sleep(1 * time.Second)
                continue
            }

            batch = append(batch, msg)

            // Накопили батч — обрабатываем
            if len(batch) >= c.batchSize {
                c.processBatch(ctx, batch)
                batch = batch[:0]
                timer.Reset(c.batchTimeout)
            }
        }
    }
}

func (c *BatchConsumer) processBatch(ctx context.Context, messages []kafka.Message) {
    if len(messages) == 0 {
        return
    }

    // Декодируем все сообщения
    decoded := make([]Message, 0, len(messages))
    for _, msg := range messages {
        event, err := decodeMessage(msg)
        if err != nil {
            log.Printf("❌ Пропуск битого сообщения: %v", err)
            continue
        }
        decoded = append(decoded, *event)
    }

    // Обрабатываем батч
    log.Printf("📦 Обработка батча из %d сообщений", len(decoded))
    if err := c.handler(ctx, decoded); err != nil {
        log.Printf("❌ Ошибка обработки батча: %v", err)
        // В реальности — ретрай или DLQ
    }

    // Коммитим весь батч
    c.commitBatch(ctx, messages)
}

func (c *BatchConsumer) commitBatch(ctx context.Context, messages []kafka.Message) {
    if len(messages) == 0 {
        return
    }
    if err := c.reader.CommitMessages(ctx, messages...); err != nil {
        log.Printf("❌ Ошибка коммита батча: %v", err)
    }
}

func (c *BatchConsumer) Close() error {
    return c.reader.Close()
}

💻 Файл: cmd/consumer/main.go

package main

import (
    "context"
    "encoding/json"
    "errors"
    "log"
    "math/rand"
    "os"
    "os/signal"
    "syscall"
    "time"

    "go-kafka-advanced/internal/kafka"
)

func main() {
    log.SetFlags(log.LstdFlags | log.Lmicroseconds)
    log.Println("📥 Запуск Consumer с ретраями и DLQ...")

    // Обработчик (иногда падает — для демонстрации ретраев)
    handler := func(ctx context.Context, msg kafka.Message) error {
        log.Printf("  📨 Обработка: id=%s type=%s retry=%d",
            msg.ID, msg.Type, getRetryFromHeaders(msg.Headers))

        // Имитация: 30% ошибок при первой попытке
        if getRetryFromHeaders(msg.Headers) == 0 && rand.Float64() < 0.3 {
            return errors.New("transient error: database timeout")
        }

        // Успешная обработка
        var payload map[string]any
        json.Unmarshal(msg.Payload, &payload)
        log.Printf("  ✅ Успешно: %v", payload)
        return nil
    }

    consumer := kafka.NewRetryableConsumer(kafka.RetryableConsumerConfig{
        Brokers:    []string{"localhost:9092"},
        Topic:      "orders",
        GroupID:    "order-processor-v2",
        DLQTopic:   "orders.dlq",
        RetryTopic: "orders", // Ретраим в тот же топик
        RetryCfg:   kafka.DefaultRetryConfig(),
        Handler:    handler,
    })
    defer consumer.Close()

    // Graceful shutdown
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    sigCh := make(chan os.Signal, 1)
    signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)

    go func() {
        <-sigCh
        log.Println("🛑 Выключение consumer...")
        cancel()
    }()

    if err := consumer.Start(ctx); err != nil {
        log.Printf("Consumer остановлен: %v", err)
    }
}

func getRetryFromHeaders(headers map[string]string) int {
    if s, ok := headers[kafka.HeaderRetryCount]; ok {
        var n int
        fmt.Sscanf(s, "%d", &n)
        return n
    }
    return 0
}

import "fmt"

💻 Файл: cmd/producer/main.go

package main

import (
    "context"
    "fmt"
    "log"
    "time"

    "go-kafka-advanced/internal/kafka"
)

func main() {
    log.Println("📤 Producer: отправка тестовых сообщений...")

    producer := kafka.NewProducer(kafka.ProducerConfig{
        Brokers: []string{"localhost:9092"},
        Topic:   "orders",
    })
    defer producer.Close()

    ctx := context.Background()

    for i := 1; i <= 10; i++ {
        payload := map[string]any{
            "order_id": fmt.Sprintf("order-%d", i),
            "amount":   float64(i * 100),
        }

        if err := producer.PublishEvent(ctx, "order.created", payload); err != nil {
            log.Printf("❌ Ошибка: %v", err)
        }
        time.Sleep(200 * time.Millisecond)
    }

    log.Println("✅ Producer завершил работу")
}

🚀 Запуск и мониторинг

# Запускаем Kafka (если ещё не)
docker-compose -f ../lesson-28/docker-compose.yml up -d

# Создаём DLQ-топик
docker exec -it lesson-28-kafka-1 kafka-topics \
  --create --topic orders.dlq \
  --bootstrap-server localhost:9092 \
  --partitions 1 --replication-factor 1

# Consumer (терминал 1)
go run ./cmd/consumer/main.go

# Producer (терминал 2)
go run ./cmd/producer/main.go

# Просмотр DLQ
docker exec -it lesson-28-kafka-1 kafka-console-consumer \
  --bootstrap-server localhost:9092 \
  --topic orders.dlq \
  --from-beginning

# Мониторинг consumer group lag
docker exec -it lesson-28-kafka-1 kafka-consumer-groups \
  --bootstrap-server localhost:9092 \
  --group order-processor-v2 \
  --describe

# Статистика consumer (в коде)
# stats := consumer.reader.Stats()
# log.Printf("Lag: %d, Messages: %d", stats.Lag, stats.Messages)

📊 Retry Strategies

СтратегияОписаниеКогда использовать
Fixed delayОдинаковая задержка между попыткамиПростые случаи
Exponential backoffЗадержка растёт: 1s, 2s, 4s, 8s…Перегрузка БД/API
JitterExponential + случайная добавкаThundering herd problem
Dead Letter QueueПроблемные сообщения → отдельный топикРучной разбор ошибок
⚠️ Типичные ошибки:
💡 Практический совет:

💡 Best practices от сеньоров:

🔑 Ключевые концепции

Что нужно запомнить из этого урока:

💡 Для Node.js разработчика:

← Предыдущий урок Следующий урок →