kafkajs (npm) → github.com/segmentio/kafka-go
await producer.send({ topic, messages }) → writer.WriteMessages(ctx, msgs...)
await consumer.run({ eachMessage }) → reader.FetchMessage(ctx) + reader.CommitMessages(ctx, msg)
autoCommit: true → автоматический коммит (но лучше ручной!)
fromBeginning: true → StartOffset: kafka.FirstOffset
mkdir go-kafka && cd go-kafka
go mod init go-kafka
# Устанавливаем kafka-go
go get github.com/segmentio/kafka-go
go get github.com/google/uuid
go mod tidy
# Запускаем Kafka (docker-compose)
cat > docker-compose.yml << 'EOF'
version: '3.8'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.5.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
kafka:
image: confluentinc/cp-kafka:7.5.0
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
depends_on:
- zookeeper
EOF
docker-compose up -d
internal/kafka/producer.gopackage kafka
import (
"context"
"encoding/json"
"fmt"
"log"
"time"
"github.com/google/uuid"
"github.com/segmentio/kafka-go"
)
// Producer — обёртка над kafka.Writer
type Producer struct {
writer *kafka.Writer
topic string
}
// ProducerConfig — конфигурация продюсера
type ProducerConfig struct {
Brokers []string
Topic string
// Асинхронная отправка (быстрее, но меньше гарантий)
Async bool
}
// NewProducer — создаёт продюсера
func NewProducer(cfg ProducerConfig) *Producer {
writer := &kafka.Writer{
Addr: kafka.TCP(cfg.Brokers...),
Topic: cfg.Topic,
Balancer: &kafka.LeastBytes{}, // Равномерное распределение по партициям
// Таймауты
WriteTimeout: 10 * time.Second,
ReadTimeout: 10 * time.Second,
// Для гарантированной доставки
RequiredAcks: kafka.RequireAll, // Ждём подтверждения от ВСЕХ реплик
// Async: cfg.Async, // Раскомментировать для асинхронной отправки
// Максимальное количество попыток
MaxAttempts: 3,
}
return &Producer{writer: writer, topic: cfg.Topic}
}
// Event — структура события
type Event struct {
ID string `json:"id"`
Type string `json:"type"`
Payload any `json:"payload"`
Timestamp time.Time `json:"timestamp"`
}
// OrderCreatedPayload — пример payload для события
type OrderCreatedPayload struct {
OrderID string `json:"order_id"`
UserID string `json:"user_id"`
Amount float64 `json:"amount"`
Status string `json:"status"`
}
// PublishEvent — отправляет событие в Kafka
func (p *Producer) PublishEvent(ctx context.Context, eventType string, payload any) error {
event := Event{
ID: uuid.New().String(),
Type: eventType,
Payload: payload,
Timestamp: time.Now(),
}
// Сериализуем в JSON
data, err := json.Marshal(event)
if err != nil {
return fmt.Errorf("marshal event: %w", err)
}
// Формируем сообщение
msg := kafka.Message{
Key: []byte(eventType), // Ключ для партиционирования
Value: data,
Headers: []kafka.Header{
{Key: "event_type", Value: []byte(eventType)},
{Key: "event_id", Value: []byte(event.ID)},
{Key: "source", Value: []byte("order-service")},
},
Time: time.Now(),
}
// Отправляем
if err := p.writer.WriteMessages(ctx, msg); err != nil {
return fmt.Errorf("write message: %w", err)
}
log.Printf("📤 Событие отправлено: type=%s id=%s", eventType, event.ID)
return nil
}
// PublishBatch — отправляет пачку сообщений
func (p *Producer) PublishBatch(ctx context.Context, events []struct {
Type string
Payload any
}) error {
messages := make([]kafka.Message, len(events))
for i, e := range events {
event := Event{
ID: uuid.New().String(),
Type: e.Type,
Payload: e.Payload,
Timestamp: time.Now(),
}
data, err := json.Marshal(event)
if err != nil {
return fmt.Errorf("marshal event %d: %w", i, err)
}
messages[i] = kafka.Message{
Key: []byte(e.Type),
Value: data,
Time: time.Now(),
}
}
if err := p.writer.WriteMessages(ctx, messages...); err != nil {
return fmt.Errorf("write batch: %w", err)
}
log.Printf("📤 Отправлено %d событий", len(events))
return nil
}
// Close — закрывает продюсера
func (p *Producer) Close() error {
log.Println("Закрытие Kafka producer...")
return p.writer.Close()
}
internal/kafka/consumer.gopackage kafka
import (
"context"
"encoding/json"
"fmt"
"log"
"os"
"os/signal"
"syscall"
"time"
"github.com/segmentio/kafka-go"
)
// Consumer — обёртка над kafka.Reader
type Consumer struct {
reader *kafka.Reader
handler MessageHandler
}
// ConsumerConfig — конфигурация консьюмера
type ConsumerConfig struct {
Brokers []string
Topic string
GroupID string // Consumer group для параллельной обработки
// Стратегия при старте: с начала или с последнего
StartOffset int64 // kafka.FirstOffset или kafka.LastOffset
}
// MessageHandler — функция-обработчик сообщений
type MessageHandler func(ctx context.Context, msg Message) error
// Message — декодированное сообщение
type Message struct {
ID string
Type string
Payload json.RawMessage // Сырой JSON (можно десериализовать позже)
Timestamp time.Time
Headers map[string]string
// Оригинальное kafka-сообщение (для коммита)
raw kafka.Message
}
// NewConsumer — создаёт консьюмера
func NewConsumer(cfg ConsumerConfig, handler MessageHandler) *Consumer {
reader := kafka.NewReader(kafka.ReaderConfig{
Brokers: cfg.Brokers,
Topic: cfg.Topic,
GroupID: cfg.GroupID,
MinBytes: 10e3, // 10KB — минимальный размер запроса
MaxBytes: 10e6, // 10MB — максимальный
// Максимальное время ожидания новых сообщений
MaxWait: 1 * time.Second,
// Стратегия при старте группы
StartOffset: cfg.StartOffset,
// Таймаут на чтение
ReadBatchTimeout: 5 * time.Second,
// Автоматический коммит ОТКЛЮЧАЕМ (ручной контроль!)
CommitInterval: -1,
})
return &Consumer{
reader: reader,
handler: handler,
}
}
// Start — запускает консьюмер (блокирующий вызов)
func (c *Consumer) Start(ctx context.Context) error {
log.Printf("📥 Consumer запущен: topic=%s group=%s",
c.reader.Config().Topic, c.reader.Config().GroupID)
for {
// Проверяем отмену контекста
select {
case <-ctx.Done():
log.Println("Consumer остановлен по контексту")
return ctx.Err()
default:
}
// Читаем сообщение
msg, err := c.reader.FetchMessage(ctx)
if err != nil {
if err == context.Canceled || err == context.DeadlineExceeded {
return nil
}
log.Printf("❌ Ошибка чтения: %v", err)
time.Sleep(1 * time.Second) // Пауза перед повтором
continue
}
// Декодируем
event, err := decodeMessage(msg)
if err != nil {
log.Printf("❌ Ошибка декодирования: %v", err)
// Коммитим битые сообщения, чтобы не застрять
c.reader.CommitMessages(ctx, msg)
continue
}
// Обрабатываем
log.Printf("📥 Получено: type=%s id=%s partition=%d offset=%d",
event.Type, event.ID, msg.Partition, msg.Offset)
if err := c.handler(ctx, *event); err != nil {
log.Printf("❌ Ошибка обработки: %v", err)
// НЕ коммитим — сообщение будет обработано снова
// TODO: DLQ для проблемных сообщений
continue
}
// КОММИТИМ после успешной обработки (at-least-once)
if err := c.reader.CommitMessages(ctx, msg); err != nil {
log.Printf("❌ Ошибка коммита: %v", err)
}
}
}
// StartWithGracefulShutdown — запускает с graceful shutdown
func (c *Consumer) StartWithGracefulShutdown(parentCtx context.Context) error {
ctx, cancel := context.WithCancel(parentCtx)
defer cancel()
// Слушаем сигналы ОС
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
go func() {
sig := <-sigCh
log.Printf("📡 Получен сигнал: %v. Останавливаем consumer...", sig)
cancel()
}()
return c.Start(ctx)
}
// Stats — возвращает статистику консьюмера
func (c *Consumer) Stats() kafka.ReaderStats {
return c.reader.Stats()
}
// Close — закрывает консьюмера
func (c *Consumer) Close() error {
log.Println("Закрытие Kafka consumer...")
return c.reader.Close()
}
// ╔══════════════════════════════════════════════════════════╗
// ║ ВСПОМОГАТЕЛЬНЫЕ ФУНКЦИИ ║
// ╚══════════════════════════════════════════════════════════╝
// decodeMessage — декодирует kafka.Message в Message
func decodeMessage(msg kafka.Message) (*Message, error) {
var event Event
if err := json.Unmarshal(msg.Value, &event); err != nil {
return nil, fmt.Errorf("unmarshal event: %w", err)
}
// Собираем заголовки
headers := make(map[string]string, len(msg.Headers))
for _, h := range msg.Headers {
headers[h.Key] = string(h.Value)
}
return &Message{
ID: event.ID,
Type: event.Type,
Payload: json.RawMessage{}, // TODO: перепаковать
Timestamp: event.Timestamp,
Headers: headers,
raw: msg,
}, nil
}
cmd/producer/main.gopackage main
import (
"context"
"log"
"time"
"go-kafka/internal/kafka"
)
func main() {
log.SetFlags(log.LstdFlags | log.Lmicroseconds)
log.Println("📤 Запуск Kafka Producer...")
// Создаём продюсера
producer := kafka.NewProducer(kafka.ProducerConfig{
Brokers: []string{"localhost:9092"},
Topic: "orders",
})
defer producer.Close()
ctx := context.Background()
// Отправляем одиночные события
for i := 1; i <= 5; i++ {
payload := kafka.OrderCreatedPayload{
OrderID: fmt.Sprintf("order-%d", i),
UserID: fmt.Sprintf("user-%d", i%3+1),
Amount: float64(i*100) + 0.99,
Status: "created",
}
if err := producer.PublishEvent(ctx, "order.created", payload); err != nil {
log.Printf("❌ Ошибка отправки: %v", err)
}
time.Sleep(500 * time.Millisecond)
}
// Отправляем батч
log.Println("\n📤 Отправка батча...")
batch := []struct {
Type string
Payload any
}{
{Type: "order.paid", Payload: map[string]string{"order_id": "batch-1"}},
{Type: "order.paid", Payload: map[string]string{"order_id": "batch-2"}},
{Type: "order.cancelled", Payload: map[string]string{"order_id": "batch-3"}},
}
if err := producer.PublishBatch(ctx, batch); err != nil {
log.Printf("❌ Ошибка батча: %v", err)
}
log.Println("✅ Producer завершил работу")
}
import "fmt"
cmd/consumer/main.gopackage main
import (
"context"
"encoding/json"
"log"
"time"
"go-kafka/internal/kafka"
)
func main() {
log.SetFlags(log.LstdFlags | log.Lmicroseconds)
log.Println("📥 Запуск Kafka Consumer...")
// Определяем обработчик
handler := func(ctx context.Context, msg kafka.Message) error {
log.Printf(" Обработка: type=%s id=%s", msg.Type, msg.ID)
// В зависимости от типа — разная логика
switch msg.Type {
case "order.created":
var payload kafka.OrderCreatedPayload
if err := json.Unmarshal(msg.Payload, &payload); err != nil {
return err
}
log.Printf(" → Новый заказ: %s (сумма: $%.2f)", payload.OrderID, payload.Amount)
// Здесь: сохранение в БД, отправка email и т.д.
case "order.paid":
log.Printf(" → Заказ оплачен: %s", string(msg.Payload))
case "order.cancelled":
log.Printf(" → Заказ отменён: %s", string(msg.Payload))
default:
log.Printf(" → Неизвестный тип события: %s", msg.Type)
}
// Имитация обработки
time.Sleep(100 * time.Millisecond)
return nil
}
// Создаём консьюмера
consumer := kafka.NewConsumer(kafka.ConsumerConfig{
Brokers: []string{"localhost:9092"},
Topic: "orders",
GroupID: "order-processor",
StartOffset: kafka.FirstOffset, // Читаем с САМОГО начала
}, handler)
defer consumer.Close()
// Запускаем с graceful shutdown
if err := consumer.StartWithGracefulShutdown(context.Background()); err != nil {
log.Printf("Consumer остановлен: %v", err)
}
log.Println("✅ Consumer завершил работу")
}
import "github.com/segmentio/kafka-go"
# Запускаем Kafka
docker-compose up -d
# Ждём готовности (10-15 секунд)
# Терминал 1: Consumer (будет ждать сообщения)
go run ./cmd/consumer/main.go
# Терминал 2: Producer (отправит сообщения)
go run ./cmd/producer/main.go
# Просмотр сообщений в Kafka
docker exec -it go-kafka-kafka-1 kafka-console-consumer \
--bootstrap-server localhost:9092 \
--topic orders \
--from-beginning
| Компонент | Структура | Методы | Гарантии |
|---|---|---|---|
| Writer (Producer) | kafka.Writer | WriteMessages, Close | RequiredAcks: None(0), Leader(1), All(-1) |
| Reader (Consumer) | kafka.Reader | FetchMessage, CommitMessages, Close | At-least-once (ручной коммит) |
💡 Best practices от сеньоров:
💡 Для Node.js разработчика:
kafkajs — самый популярный клиент в Node.js. segmentio/kafka-go — его аналог в Go.
await producer.send() → В Go: writer.WriteMessages(ctx, msgs...).eachMessage → FetchMessage + CommitMessages.