DLQ в kafkajs → ручная реализация через второй топик
retry: { retries: 3 } → заголовки x-retry-count + пауза
consumer.run({ eachBatch }) → пакетная обработка в Go
Ребалансировка при добавлении consumer → автоматически в consumer group
Lag monitoring → reader.Stats() / kafka-consumer-groups
mkdir go-kafka-advanced && cd go-kafka-advanced
go mod init go-kafka-advanced
go get github.com/segmentio/kafka-go
go get github.com/google/uuid
go mod tidy
# Структура
mkdir -p internal/kafka
mkdir -p cmd/consumer
mkdir -p cmd/producer
internal/kafka/retry_dlq.gopackage kafka
import (
"context"
"encoding/json"
"fmt"
"log"
"math"
"time"
"github.com/segmentio/kafka-go"
)
// ╔══════════════════════════════════════════════════════════╗
// ║ КОНФИГУРАЦИЯ РЕТРАЕВ ║
// ╚══════════════════════════════════════════════════════════╝
// RetryConfig — конфигурация повторных попыток
type RetryConfig struct {
MaxRetries int // Максимальное количество попыток (включая первую)
InitialBackoff time.Duration // Начальная задержка
MaxBackoff time.Duration // Максимальная задержка
Multiplier float64 // Множитель для exponential backoff
}
// DefaultRetryConfig — разумные значения по умолчанию
func DefaultRetryConfig() RetryConfig {
return RetryConfig{
MaxRetries: 3,
InitialBackoff: 1 * time.Second,
MaxBackoff: 60 * time.Second,
Multiplier: 2.0,
}
}
// ╔══════════════════════════════════════════════════════════╗
// ║ ЗАГОЛОВКИ ДЛЯ РЕТРАЕВ И DLQ ║
// ╚══════════════════════════════════════════════════════════╝
const (
HeaderRetryCount = "x-retry-count"
HeaderOriginalTopic = "x-original-topic"
HeaderOriginalID = "x-original-id"
HeaderDeadLetter = "x-dead-letter"
HeaderErrorReason = "x-error-reason"
)
// ╔══════════════════════════════════════════════════════════╗
// ║ RETRY HANDLER (обработчик с ретраями) ║
// ╚══════════════════════════════════════════════════════════╝
// RetryableConsumer — consumer с поддержкой ретраев и DLQ
type RetryableConsumer struct {
reader *kafka.Reader
dlqWriter *kafka.Writer // Пишет в DLQ
retryWriter *kafka.Writer // Пишет в retry-топик
handler MessageHandler
config RetryConfig
}
// RetryableConsumerConfig — конфигурация
type RetryableConsumerConfig struct {
Brokers []string
Topic string
GroupID string
DLQTopic string // dead-letter-queue
RetryTopic string // топик для ретраев (может совпадать с основным)
RetryCfg RetryConfig
Handler MessageHandler
}
// NewRetryableConsumer — создаёт consumer с ретраями
func NewRetryableConsumer(cfg RetryableConsumerConfig) *RetryableConsumer {
reader := kafka.NewReader(kafka.ReaderConfig{
Brokers: cfg.Brokers,
Topic: cfg.Topic,
GroupID: cfg.GroupID,
MinBytes: 10e3,
MaxBytes: 10e6,
CommitInterval: -1, // Ручной коммит
})
dlqWriter := &kafka.Writer{
Addr: kafka.TCP(cfg.Brokers...),
Topic: cfg.DLQTopic,
Balancer: &kafka.LeastBytes{},
}
retryWriter := &kafka.Writer{
Addr: kafka.TCP(cfg.Brokers...),
Topic: cfg.RetryTopic,
Balancer: &kafka.LeastBytes{},
}
return &RetryableConsumer{
reader: reader,
dlqWriter: dlqWriter,
retryWriter: retryWriter,
handler: cfg.Handler,
config: cfg.RetryCfg,
}
}
// Start — запускает consumer с ретраями
func (c *RetryableConsumer) Start(ctx context.Context) error {
log.Printf("📥 Consumer с ретраями запущен: topic=%s dlq=%s",
c.reader.Config().Topic, c.dlqWriter.Topic)
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
msg, err := c.reader.FetchMessage(ctx)
if err != nil {
if err == context.Canceled || err == context.DeadlineExceeded {
return nil
}
log.Printf("❌ Ошибка чтения: %v", err)
time.Sleep(1 * time.Second)
continue
}
// Обрабатываем с ретраями
if err := c.processWithRetry(ctx, msg); err != nil {
log.Printf("🗑️ Отправка в DLQ: %v", err)
c.sendToDLQ(ctx, msg, err)
}
// Коммитим в любом случае (сообщение либо обработано, либо в DLQ)
c.reader.CommitMessages(ctx, msg)
}
}
// processWithRetry — обрабатывает сообщение с повторными попытками
func (c *RetryableConsumer) processWithRetry(ctx context.Context, msg kafka.Message) error {
retryCount := getRetryCount(msg)
for attempt := 0; attempt <= c.config.MaxRetries; attempt++ {
// Декодируем
event, err := decodeMessage(msg)
if err != nil {
return fmt.Errorf("decode error: %w", err)
}
// Обрабатываем
err = c.handler(ctx, *event)
if err == nil {
// Успех!
log.Printf("✅ Обработано (попытка %d/%d): %s",
attempt+1, c.config.MaxRetries+1, event.ID)
return nil
}
// Ошибка — будем ретраить
if attempt < c.config.MaxRetries {
backoff := c.calculateBackoff(attempt)
log.Printf("⏳ Ретрай %d/%d через %v: %v",
attempt+1, c.config.MaxRetries, backoff, err)
// Отправляем в retry-топик с увеличенным счётчиком
if err := c.sendToRetry(ctx, msg, attempt+1, err); err != nil {
return fmt.Errorf("send to retry: %w", err)
}
return nil // Выходим — сообщение будет обработано снова из retry-топика
}
// Исчерпаны все попытки
return fmt.Errorf("max retries exceeded (%d): %w", c.config.MaxRetries+1, err)
}
return nil
}
// sendToRetry — отправляет сообщение в retry-топик
func (c *RetryableConsumer) sendToRetry(ctx context.Context, msg kafka.Message, attempt int, originalErr error) error {
retryMsg := kafka.Message{
Key: msg.Key,
Value: msg.Value,
Headers: []kafka.Header{
{Key: HeaderRetryCount, Value: []byte(fmt.Sprintf("%d", attempt))},
{Key: HeaderOriginalTopic, Value: []byte(msg.Topic)},
{Key: HeaderErrorReason, Value: []byte(originalErr.Error())},
},
Time: time.Now(),
}
// Добавляем существующие заголовки
retryMsg.Headers = append(retryMsg.Headers, msg.Headers...)
if err := c.retryWriter.WriteMessages(ctx, retryMsg); err != nil {
return fmt.Errorf("write retry message: %w", err)
}
log.Printf("📤 Сообщение отправлено в retry (попытка %d)", attempt)
return nil
}
// sendToDLQ — отправляет сообщение в Dead Letter Queue
func (c *RetryableConsumer) sendToDLQ(ctx context.Context, msg kafka.Message, err error) {
dlqMsg := kafka.Message{
Key: msg.Key,
Value: msg.Value,
Headers: []kafka.Header{
{Key: HeaderDeadLetter, Value: []byte("true")},
{Key: HeaderOriginalTopic, Value: []byte(msg.Topic)},
{Key: HeaderErrorReason, Value: []byte(err.Error())},
},
Time: time.Now(),
}
dlqMsg.Headers = append(dlqMsg.Headers, msg.Headers...)
if writeErr := c.dlqWriter.WriteMessages(ctx, dlqMsg); writeErr != nil {
log.Printf("❌ КРИТИЧЕСКАЯ ОШИБКА: не удалось записать в DLQ: %v", writeErr)
// В реальности — сохранить в файл/БД для ручного разбора
} else {
log.Printf("📤 Сообщение отправлено в DLQ")
}
}
// calculateBackoff — вычисляет задержку для exponential backoff
func (c *RetryableConsumer) calculateBackoff(attempt int) time.Duration {
backoff := float64(c.config.InitialBackoff) * math.Pow(c.config.Multiplier, float64(attempt))
if backoff > float64(c.config.MaxBackoff) {
backoff = float64(c.config.MaxBackoff)
}
return time.Duration(backoff)
}
// getRetryCount — извлекает счётчик ретраев из заголовков
func getRetryCount(msg kafka.Message) int {
for _, h := range msg.Headers {
if h.Key == HeaderRetryCount {
var count int
fmt.Sscanf(string(h.Value), "%d", &count)
return count
}
}
return 0
}
// Close — закрывает все соединения
func (c *RetryableConsumer) Close() error {
var errs []error
if err := c.reader.Close(); err != nil {
errs = append(errs, err)
}
if err := c.dlqWriter.Close(); err != nil {
errs = append(errs, err)
}
if err := c.retryWriter.Close(); err != nil {
errs = append(errs, err)
}
if len(errs) > 0 {
return fmt.Errorf("close errors: %v", errs)
}
return nil
}
internal/kafka/batch_consumer.gopackage kafka
import (
"context"
"log"
"time"
"github.com/segmentio/kafka-go"
)
// ╔══════════════════════════════════════════════════════════╗
// ║ BATCH CONSUMER (пакетная обработка) ║
// ╚══════════════════════════════════════════════════════════╝
// BatchHandler — обработчик ПАЧКИ сообщений
type BatchHandler func(ctx context.Context, messages []Message) error
// BatchConsumer — consumer с пакетной обработкой
type BatchConsumer struct {
reader *kafka.Reader
handler BatchHandler
batchSize int
batchTimeout time.Duration
}
// BatchConsumerConfig — конфигурация
type BatchConsumerConfig struct {
Brokers []string
Topic string
GroupID string
BatchSize int // Максимальный размер батча
BatchTimeout time.Duration // Максимальное время ожидания батча
Handler BatchHandler
}
// NewBatchConsumer — создаёт batch consumer
func NewBatchConsumer(cfg BatchConsumerConfig) *BatchConsumer {
reader := kafka.NewReader(kafka.ReaderConfig{
Brokers: cfg.Brokers,
Topic: cfg.Topic,
GroupID: cfg.GroupID,
MinBytes: 10e3,
MaxBytes: 10e6,
CommitInterval: -1,
})
return &BatchConsumer{
reader: reader,
handler: cfg.Handler,
batchSize: cfg.BatchSize,
batchTimeout: cfg.BatchTimeout,
}
}
// Start — запускает пакетную обработку
func (c *BatchConsumer) Start(ctx context.Context) error {
log.Printf("📥 Batch consumer: topic=%s batch_size=%d timeout=%v",
c.reader.Config().Topic, c.batchSize, c.batchTimeout)
batch := make([]kafka.Message, 0, c.batchSize)
timer := time.NewTimer(c.batchTimeout)
defer timer.Stop()
for {
select {
case <-ctx.Done():
c.commitBatch(ctx, batch)
return ctx.Err()
case <-timer.C:
if len(batch) > 0 {
c.processBatch(ctx, batch)
batch = batch[:0]
}
timer.Reset(c.batchTimeout)
default:
msg, err := c.reader.FetchMessage(ctx)
if err != nil {
if err == context.Canceled || err == context.DeadlineExceeded {
c.processBatch(ctx, batch)
return nil
}
log.Printf("❌ Ошибка чтения: %v", err)
time.Sleep(1 * time.Second)
continue
}
batch = append(batch, msg)
// Накопили батч — обрабатываем
if len(batch) >= c.batchSize {
c.processBatch(ctx, batch)
batch = batch[:0]
timer.Reset(c.batchTimeout)
}
}
}
}
func (c *BatchConsumer) processBatch(ctx context.Context, messages []kafka.Message) {
if len(messages) == 0 {
return
}
// Декодируем все сообщения
decoded := make([]Message, 0, len(messages))
for _, msg := range messages {
event, err := decodeMessage(msg)
if err != nil {
log.Printf("❌ Пропуск битого сообщения: %v", err)
continue
}
decoded = append(decoded, *event)
}
// Обрабатываем батч
log.Printf("📦 Обработка батча из %d сообщений", len(decoded))
if err := c.handler(ctx, decoded); err != nil {
log.Printf("❌ Ошибка обработки батча: %v", err)
// В реальности — ретрай или DLQ
}
// Коммитим весь батч
c.commitBatch(ctx, messages)
}
func (c *BatchConsumer) commitBatch(ctx context.Context, messages []kafka.Message) {
if len(messages) == 0 {
return
}
if err := c.reader.CommitMessages(ctx, messages...); err != nil {
log.Printf("❌ Ошибка коммита батча: %v", err)
}
}
func (c *BatchConsumer) Close() error {
return c.reader.Close()
}
cmd/consumer/main.gopackage main
import (
"context"
"encoding/json"
"errors"
"log"
"math/rand"
"os"
"os/signal"
"syscall"
"time"
"go-kafka-advanced/internal/kafka"
)
func main() {
log.SetFlags(log.LstdFlags | log.Lmicroseconds)
log.Println("📥 Запуск Consumer с ретраями и DLQ...")
// Обработчик (иногда падает — для демонстрации ретраев)
handler := func(ctx context.Context, msg kafka.Message) error {
log.Printf(" 📨 Обработка: id=%s type=%s retry=%d",
msg.ID, msg.Type, getRetryFromHeaders(msg.Headers))
// Имитация: 30% ошибок при первой попытке
if getRetryFromHeaders(msg.Headers) == 0 && rand.Float64() < 0.3 {
return errors.New("transient error: database timeout")
}
// Успешная обработка
var payload map[string]any
json.Unmarshal(msg.Payload, &payload)
log.Printf(" ✅ Успешно: %v", payload)
return nil
}
consumer := kafka.NewRetryableConsumer(kafka.RetryableConsumerConfig{
Brokers: []string{"localhost:9092"},
Topic: "orders",
GroupID: "order-processor-v2",
DLQTopic: "orders.dlq",
RetryTopic: "orders", // Ретраим в тот же топик
RetryCfg: kafka.DefaultRetryConfig(),
Handler: handler,
})
defer consumer.Close()
// Graceful shutdown
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-sigCh
log.Println("🛑 Выключение consumer...")
cancel()
}()
if err := consumer.Start(ctx); err != nil {
log.Printf("Consumer остановлен: %v", err)
}
}
func getRetryFromHeaders(headers map[string]string) int {
if s, ok := headers[kafka.HeaderRetryCount]; ok {
var n int
fmt.Sscanf(s, "%d", &n)
return n
}
return 0
}
import "fmt"
cmd/producer/main.gopackage main
import (
"context"
"fmt"
"log"
"time"
"go-kafka-advanced/internal/kafka"
)
func main() {
log.Println("📤 Producer: отправка тестовых сообщений...")
producer := kafka.NewProducer(kafka.ProducerConfig{
Brokers: []string{"localhost:9092"},
Topic: "orders",
})
defer producer.Close()
ctx := context.Background()
for i := 1; i <= 10; i++ {
payload := map[string]any{
"order_id": fmt.Sprintf("order-%d", i),
"amount": float64(i * 100),
}
if err := producer.PublishEvent(ctx, "order.created", payload); err != nil {
log.Printf("❌ Ошибка: %v", err)
}
time.Sleep(200 * time.Millisecond)
}
log.Println("✅ Producer завершил работу")
}
# Запускаем Kafka (если ещё не)
docker-compose -f ../lesson-28/docker-compose.yml up -d
# Создаём DLQ-топик
docker exec -it lesson-28-kafka-1 kafka-topics \
--create --topic orders.dlq \
--bootstrap-server localhost:9092 \
--partitions 1 --replication-factor 1
# Consumer (терминал 1)
go run ./cmd/consumer/main.go
# Producer (терминал 2)
go run ./cmd/producer/main.go
# Просмотр DLQ
docker exec -it lesson-28-kafka-1 kafka-console-consumer \
--bootstrap-server localhost:9092 \
--topic orders.dlq \
--from-beginning
# Мониторинг consumer group lag
docker exec -it lesson-28-kafka-1 kafka-consumer-groups \
--bootstrap-server localhost:9092 \
--group order-processor-v2 \
--describe
# Статистика consumer (в коде)
# stats := consumer.reader.Stats()
# log.Printf("Lag: %d, Messages: %d", stats.Lag, stats.Messages)
| Стратегия | Описание | Когда использовать |
|---|---|---|
| Fixed delay | Одинаковая задержка между попытками | Простые случаи |
| Exponential backoff | Задержка растёт: 1s, 2s, 4s, 8s… | Перегрузка БД/API |
| Jitter | Exponential + случайная добавка | Thundering herd problem |
| Dead Letter Queue | Проблемные сообщения → отдельный топик | Ручной разбор ошибок |
💡 Best practices от сеньоров:
💡 Для Node.js разработчика:
retry: { retries: 3 }. В Go — через заголовки и повторную отправку.consumer.events. В kafka-go — reader.Stats().