Урок 28: Kafka — producer и consumer в Go

Урок 28. Kafka — producer и consumer в Go

🔄 Node.js → Go (ключевые аналоги):

📋 Что изучаем

📦 Инициализация проекта

mkdir go-kafka && cd go-kafka
go mod init go-kafka

# Устанавливаем kafka-go
go get github.com/segmentio/kafka-go
go get github.com/google/uuid
go mod tidy

# Запускаем Kafka (docker-compose)
cat > docker-compose.yml << 'EOF'
version: '3.8'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.5.0
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181

  kafka:
    image: confluentinc/cp-kafka:7.5.0
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    depends_on:
      - zookeeper
EOF

docker-compose up -d

💻 Файл: internal/kafka/producer.go

package kafka

import (
    "context"
    "encoding/json"
    "fmt"
    "log"
    "time"

    "github.com/google/uuid"
    "github.com/segmentio/kafka-go"
)

// Producer — обёртка над kafka.Writer
type Producer struct {
    writer *kafka.Writer
    topic  string
}

// ProducerConfig — конфигурация продюсера
type ProducerConfig struct {
    Brokers []string
    Topic   string
    // Асинхронная отправка (быстрее, но меньше гарантий)
    Async bool
}

// NewProducer — создаёт продюсера
func NewProducer(cfg ProducerConfig) *Producer {
    writer := &kafka.Writer{
        Addr:     kafka.TCP(cfg.Brokers...),
        Topic:    cfg.Topic,
        Balancer: &kafka.LeastBytes{}, // Равномерное распределение по партициям
        // Таймауты
        WriteTimeout: 10 * time.Second,
        ReadTimeout:  10 * time.Second,
        // Для гарантированной доставки
        RequiredAcks: kafka.RequireAll, // Ждём подтверждения от ВСЕХ реплик
        // Async: cfg.Async, // Раскомментировать для асинхронной отправки
        // Максимальное количество попыток
        MaxAttempts: 3,
    }

    return &Producer{writer: writer, topic: cfg.Topic}
}

// Event — структура события
type Event struct {
    ID        string    `json:"id"`
    Type      string    `json:"type"`
    Payload   any       `json:"payload"`
    Timestamp time.Time `json:"timestamp"`
}

// OrderCreatedPayload — пример payload для события
type OrderCreatedPayload struct {
    OrderID string  `json:"order_id"`
    UserID  string  `json:"user_id"`
    Amount  float64 `json:"amount"`
    Status  string  `json:"status"`
}

// PublishEvent — отправляет событие в Kafka
func (p *Producer) PublishEvent(ctx context.Context, eventType string, payload any) error {
    event := Event{
        ID:        uuid.New().String(),
        Type:      eventType,
        Payload:   payload,
        Timestamp: time.Now(),
    }

    // Сериализуем в JSON
    data, err := json.Marshal(event)
    if err != nil {
        return fmt.Errorf("marshal event: %w", err)
    }

    // Формируем сообщение
    msg := kafka.Message{
        Key:   []byte(eventType), // Ключ для партиционирования
        Value: data,
        Headers: []kafka.Header{
            {Key: "event_type", Value: []byte(eventType)},
            {Key: "event_id", Value: []byte(event.ID)},
            {Key: "source", Value: []byte("order-service")},
        },
        Time: time.Now(),
    }

    // Отправляем
    if err := p.writer.WriteMessages(ctx, msg); err != nil {
        return fmt.Errorf("write message: %w", err)
    }

    log.Printf("📤 Событие отправлено: type=%s id=%s", eventType, event.ID)
    return nil
}

// PublishBatch — отправляет пачку сообщений
func (p *Producer) PublishBatch(ctx context.Context, events []struct {
    Type    string
    Payload any
}) error {
    messages := make([]kafka.Message, len(events))

    for i, e := range events {
        event := Event{
            ID:        uuid.New().String(),
            Type:      e.Type,
            Payload:   e.Payload,
            Timestamp: time.Now(),
        }

        data, err := json.Marshal(event)
        if err != nil {
            return fmt.Errorf("marshal event %d: %w", i, err)
        }

        messages[i] = kafka.Message{
            Key:   []byte(e.Type),
            Value: data,
            Time:  time.Now(),
        }
    }

    if err := p.writer.WriteMessages(ctx, messages...); err != nil {
        return fmt.Errorf("write batch: %w", err)
    }

    log.Printf("📤 Отправлено %d событий", len(events))
    return nil
}

// Close — закрывает продюсера
func (p *Producer) Close() error {
    log.Println("Закрытие Kafka producer...")
    return p.writer.Close()
}

💻 Файл: internal/kafka/consumer.go

package kafka

import (
    "context"
    "encoding/json"
    "fmt"
    "log"
    "os"
    "os/signal"
    "syscall"
    "time"

    "github.com/segmentio/kafka-go"
)

// Consumer — обёртка над kafka.Reader
type Consumer struct {
    reader  *kafka.Reader
    handler MessageHandler
}

// ConsumerConfig — конфигурация консьюмера
type ConsumerConfig struct {
    Brokers []string
    Topic   string
    GroupID string // Consumer group для параллельной обработки
    // Стратегия при старте: с начала или с последнего
    StartOffset int64 // kafka.FirstOffset или kafka.LastOffset
}

// MessageHandler — функция-обработчик сообщений
type MessageHandler func(ctx context.Context, msg Message) error

// Message — декодированное сообщение
type Message struct {
    ID        string
    Type      string
    Payload   json.RawMessage // Сырой JSON (можно десериализовать позже)
    Timestamp time.Time
    Headers   map[string]string
    // Оригинальное kafka-сообщение (для коммита)
    raw kafka.Message
}

// NewConsumer — создаёт консьюмера
func NewConsumer(cfg ConsumerConfig, handler MessageHandler) *Consumer {
    reader := kafka.NewReader(kafka.ReaderConfig{
        Brokers:  cfg.Brokers,
        Topic:    cfg.Topic,
        GroupID:  cfg.GroupID,
        MinBytes: 10e3,  // 10KB — минимальный размер запроса
        MaxBytes: 10e6,  // 10MB — максимальный
        // Максимальное время ожидания новых сообщений
        MaxWait: 1 * time.Second,
        // Стратегия при старте группы
        StartOffset: cfg.StartOffset,
        // Таймаут на чтение
        ReadBatchTimeout: 5 * time.Second,
        // Автоматический коммит ОТКЛЮЧАЕМ (ручной контроль!)
        CommitInterval: -1,
    })

    return &Consumer{
        reader:  reader,
        handler: handler,
    }
}

// Start — запускает консьюмер (блокирующий вызов)
func (c *Consumer) Start(ctx context.Context) error {
    log.Printf("📥 Consumer запущен: topic=%s group=%s",
        c.reader.Config().Topic, c.reader.Config().GroupID)

    for {
        // Проверяем отмену контекста
        select {
        case <-ctx.Done():
            log.Println("Consumer остановлен по контексту")
            return ctx.Err()
        default:
        }

        // Читаем сообщение
        msg, err := c.reader.FetchMessage(ctx)
        if err != nil {
            if err == context.Canceled || err == context.DeadlineExceeded {
                return nil
            }
            log.Printf("❌ Ошибка чтения: %v", err)
            time.Sleep(1 * time.Second) // Пауза перед повтором
            continue
        }

        // Декодируем
        event, err := decodeMessage(msg)
        if err != nil {
            log.Printf("❌ Ошибка декодирования: %v", err)
            // Коммитим битые сообщения, чтобы не застрять
            c.reader.CommitMessages(ctx, msg)
            continue
        }

        // Обрабатываем
        log.Printf("📥 Получено: type=%s id=%s partition=%d offset=%d",
            event.Type, event.ID, msg.Partition, msg.Offset)

        if err := c.handler(ctx, *event); err != nil {
            log.Printf("❌ Ошибка обработки: %v", err)
            // НЕ коммитим — сообщение будет обработано снова
            // TODO: DLQ для проблемных сообщений
            continue
        }

        // КОММИТИМ после успешной обработки (at-least-once)
        if err := c.reader.CommitMessages(ctx, msg); err != nil {
            log.Printf("❌ Ошибка коммита: %v", err)
        }
    }
}

// StartWithGracefulShutdown — запускает с graceful shutdown
func (c *Consumer) StartWithGracefulShutdown(parentCtx context.Context) error {
    ctx, cancel := context.WithCancel(parentCtx)
    defer cancel()

    // Слушаем сигналы ОС
    sigCh := make(chan os.Signal, 1)
    signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)

    go func() {
        sig := <-sigCh
        log.Printf("📡 Получен сигнал: %v. Останавливаем consumer...", sig)
        cancel()
    }()

    return c.Start(ctx)
}

// Stats — возвращает статистику консьюмера
func (c *Consumer) Stats() kafka.ReaderStats {
    return c.reader.Stats()
}

// Close — закрывает консьюмера
func (c *Consumer) Close() error {
    log.Println("Закрытие Kafka consumer...")
    return c.reader.Close()
}

// ╔══════════════════════════════════════════════════════════╗
// ║  ВСПОМОГАТЕЛЬНЫЕ ФУНКЦИИ                               ║
// ╚══════════════════════════════════════════════════════════╝

// decodeMessage — декодирует kafka.Message в Message
func decodeMessage(msg kafka.Message) (*Message, error) {
    var event Event
    if err := json.Unmarshal(msg.Value, &event); err != nil {
        return nil, fmt.Errorf("unmarshal event: %w", err)
    }

    // Собираем заголовки
    headers := make(map[string]string, len(msg.Headers))
    for _, h := range msg.Headers {
        headers[h.Key] = string(h.Value)
    }

    return &Message{
        ID:        event.ID,
        Type:      event.Type,
        Payload:   json.RawMessage{}, // TODO: перепаковать
        Timestamp: event.Timestamp,
        Headers:   headers,
        raw:       msg,
    }, nil
}

💻 Файл: cmd/producer/main.go

package main

import (
    "context"
    "log"
    "time"

    "go-kafka/internal/kafka"
)

func main() {
    log.SetFlags(log.LstdFlags | log.Lmicroseconds)
    log.Println("📤 Запуск Kafka Producer...")

    // Создаём продюсера
    producer := kafka.NewProducer(kafka.ProducerConfig{
        Brokers: []string{"localhost:9092"},
        Topic:   "orders",
    })
    defer producer.Close()

    ctx := context.Background()

    // Отправляем одиночные события
    for i := 1; i <= 5; i++ {
        payload := kafka.OrderCreatedPayload{
            OrderID: fmt.Sprintf("order-%d", i),
            UserID:  fmt.Sprintf("user-%d", i%3+1),
            Amount:  float64(i*100) + 0.99,
            Status:  "created",
        }

        if err := producer.PublishEvent(ctx, "order.created", payload); err != nil {
            log.Printf("❌ Ошибка отправки: %v", err)
        }

        time.Sleep(500 * time.Millisecond)
    }

    // Отправляем батч
    log.Println("\n📤 Отправка батча...")
    batch := []struct {
        Type    string
        Payload any
    }{
        {Type: "order.paid", Payload: map[string]string{"order_id": "batch-1"}},
        {Type: "order.paid", Payload: map[string]string{"order_id": "batch-2"}},
        {Type: "order.cancelled", Payload: map[string]string{"order_id": "batch-3"}},
    }

    if err := producer.PublishBatch(ctx, batch); err != nil {
        log.Printf("❌ Ошибка батча: %v", err)
    }

    log.Println("✅ Producer завершил работу")
}

import "fmt"

💻 Файл: cmd/consumer/main.go

package main

import (
    "context"
    "encoding/json"
    "log"
    "time"

    "go-kafka/internal/kafka"
)

func main() {
    log.SetFlags(log.LstdFlags | log.Lmicroseconds)
    log.Println("📥 Запуск Kafka Consumer...")

    // Определяем обработчик
    handler := func(ctx context.Context, msg kafka.Message) error {
        log.Printf("  Обработка: type=%s id=%s", msg.Type, msg.ID)

        // В зависимости от типа — разная логика
        switch msg.Type {
        case "order.created":
            var payload kafka.OrderCreatedPayload
            if err := json.Unmarshal(msg.Payload, &payload); err != nil {
                return err
            }
            log.Printf("    → Новый заказ: %s (сумма: $%.2f)", payload.OrderID, payload.Amount)
            // Здесь: сохранение в БД, отправка email и т.д.

        case "order.paid":
            log.Printf("    → Заказ оплачен: %s", string(msg.Payload))

        case "order.cancelled":
            log.Printf("    → Заказ отменён: %s", string(msg.Payload))

        default:
            log.Printf("    → Неизвестный тип события: %s", msg.Type)
        }

        // Имитация обработки
        time.Sleep(100 * time.Millisecond)
        return nil
    }

    // Создаём консьюмера
    consumer := kafka.NewConsumer(kafka.ConsumerConfig{
        Brokers:     []string{"localhost:9092"},
        Topic:       "orders",
        GroupID:     "order-processor",
        StartOffset: kafka.FirstOffset, // Читаем с САМОГО начала
    }, handler)
    defer consumer.Close()

    // Запускаем с graceful shutdown
    if err := consumer.StartWithGracefulShutdown(context.Background()); err != nil {
        log.Printf("Consumer остановлен: %v", err)
    }

    log.Println("✅ Consumer завершил работу")
}

import "github.com/segmentio/kafka-go"

🚀 Запуск

# Запускаем Kafka
docker-compose up -d

# Ждём готовности (10-15 секунд)

# Терминал 1: Consumer (будет ждать сообщения)
go run ./cmd/consumer/main.go

# Терминал 2: Producer (отправит сообщения)
go run ./cmd/producer/main.go

# Просмотр сообщений в Kafka
docker exec -it go-kafka-kafka-1 kafka-console-consumer \
  --bootstrap-server localhost:9092 \
  --topic orders \
  --from-beginning

📊 Producer vs Consumer

КомпонентСтруктураМетодыГарантии
Writer (Producer)kafka.WriterWriteMessages, CloseRequiredAcks: None(0), Leader(1), All(-1)
Reader (Consumer)kafka.ReaderFetchMessage, CommitMessages, CloseAt-least-once (ручной коммит)
⚠️ Типичные ошибки:
💡 Практический совет:

💡 Best practices от сеньоров:

🔑 Ключевые концепции

Что нужно запомнить из этого урока:

💡 Для Node.js разработчика:

← Предыдущий урок Следующий урок →