Урок 18: gRPC — стриминг, интерцепторы, дедлайны, retry

Урок 18. gRPC — стриминг, интерцепторы, дедлайны, retry

🔄 Node.js → Go (ключевые аналоги):

📋 Что изучаем

📦 Инициализация проекта

mkdir go-grpc-advanced && cd go-grpc-advanced
go mod init go-grpc-advanced

go get google.golang.org/grpc
go get google.golang.org/protobuf
go get github.com/google/uuid
go get google.golang.org/grpc/reflection
go mod tidy

# Создаём структуру
mkdir -p proto/events
mkdir -p gen/events
mkdir -p server
mkdir -p client

💻 Файл: proto/events/events.proto

syntax = "proto3";

package events;

option go_package = "go-grpc-advanced/gen/events;eventspb";

import "google/protobuf/timestamp.proto";

// ╔══════════════════════════════════════════════════════════╗
// ║  СЕРВИС С РАЗНЫМИ ТИПАМИ СТРИМИНГА                    ║
// ╚══════════════════════════════════════════════════════════╝

service EventService {
  // Unary — обычный запрос-ответ
  rpc Ping(PingRequest) returns (PingResponse);

  // Server streaming — сервер шлёт поток ответов
  // Клиент отправляет ОДИН запрос, получает МНОГО ответов
  rpc Subscribe(SubscribeRequest) returns (stream Event);

  // Client streaming — клиент шлёт поток запросов
  // Клиент отправляет МНОГО запросов, получает ОДИН ответ
  rpc BatchUpload(stream Event) returns (BatchUploadResponse);

  // Bidirectional streaming — оба шлют потоки
  rpc Chat(stream ChatMessage) returns (stream ChatMessage);
}

// ==========================================
// Сообщения
// ==========================================

message PingRequest {
  string message = 1;
}

message PingResponse {
  string message = 1;
  google.protobuf.Timestamp server_time = 2;
}

message Event {
  string id = 1;
  string type = 2;
  string payload = 3;
  google.protobuf.Timestamp timestamp = 4;
}

message SubscribeRequest {
  string topic = 1;
  int32 max_events = 2; // 0 = бесконечно
}

message BatchUploadResponse {
  int32 total_events = 1;
  int32 success_count = 2;
  repeated string failed_ids = 3;
}

message ChatMessage {
  string user = 1;
  string text = 2;
  google.protobuf.Timestamp timestamp = 3;
}

💻 Генерация кода

mkdir -p gen/events

protoc \
  --proto_path=proto \
  --go_out=gen/events --go_opt=paths=source_relative \
  --go-grpc_out=gen/events --go-grpc_opt=paths=source_relative \
  proto/events/*.proto

💻 Файл: server/main.go

package main

import (
    "context"
    "fmt"
    "io"
    "log"
    "net"
    "os"
    "os/signal"
    "sync"
    "syscall"
    "time"

    "google.golang.org/grpc"
    "google.golang.org/grpc/codes"
    "google.golang.org/grpc/metadata"
    "google.golang.org/grpc/reflection"
    "google.golang.org/grpc/status"
    "google.golang.org/protobuf/types/known/timestamppb"
    "github.com/google/uuid"

    pb "go-grpc-advanced/gen/events"
)

// ╔══════════════════════════════════════════════════════════╗
// ║  ИНТЕРЦЕПТОРЫ                                           ║
// ╚══════════════════════════════════════════════════════════╝

// UnaryInterceptor — логирует все unary-запросы
func UnaryLoggingInterceptor(
    ctx context.Context,
    req interface{},
    info *grpc.UnaryServerInfo,
    handler grpc.UnaryHandler,
) (interface{}, error) {
    start := time.Now()

    // Извлекаем метаданные
    md, _ := metadata.FromIncomingContext(ctx)
    requestID := "unknown"
    if ids := md.Get("x-request-id"); len(ids) > 0 {
        requestID = ids[0]
    }

    log.Printf("[%s] → %s", requestID, info.FullMethod)

    // Вызываем реальный обработчик
    resp, err := handler(ctx, req)

    st, _ := status.FromError(err)
    log.Printf("[%s] ← %s (код=%s, время=%v)",
        requestID, info.FullMethod, st.Code(), time.Since(start))

    return resp, err
}

// StreamInterceptor — логирует стримы
func StreamLoggingInterceptor(
    srv interface{},
    ss grpc.ServerStream,
    info *grpc.StreamServerInfo,
    handler grpc.StreamHandler,
) error {
    start := time.Now()
    log.Printf("[stream] → %s", info.FullMethod)

    err := handler(srv, ss)

    st, _ := status.FromError(err)
    log.Printf("[stream] ← %s (код=%s, время=%v)",
        info.FullMethod, st.Code(), time.Since(start))

    return err
}

// ╔══════════════════════════════════════════════════════════╗
// ║  РЕАЛИЗАЦИЯ СЕРВЕРА                                     ║
// ╚══════════════════════════════════════════════════════════╝

type EventServer struct {
    pb.UnimplementedEventServiceServer
    mu        sync.RWMutex
    events    []*pb.Event
    chatRooms map[string]map[string]chan *pb.ChatMessage // topic → user → chan
}

func NewEventServer() *EventServer {
    return &EventServer{
        events:    make([]*pb.Event, 0),
        chatRooms: make(map[string]map[string]chan *pb.ChatMessage),
    }
}

// ==========================================
// 1. UNARY — Ping
// ==========================================

func (s *EventServer) Ping(
    ctx context.Context,
    req *pb.PingRequest,
) (*pb.PingResponse, error) {
    // Проверяем дедлайн
    if deadline, ok := ctx.Deadline(); ok {
        log.Printf("  Дедлайн запроса: %v (осталось %v)",
            deadline, time.Until(deadline))
    }

    return &pb.PingResponse{
        Message:    fmt.Sprintf("Pong: %s", req.Message),
        ServerTime: timestamppb.Now(),
    }, nil
}

// ==========================================
// 2. SERVER STREAMING — Subscribe
// ==========================================

func (s *EventServer) Subscribe(
    req *pb.SubscribeRequest,
    stream pb.EventService_SubscribeServer,
) error {
    log.Printf("📡 Клиент подписался на тему %q (макс=%d)", req.Topic, req.MaxEvents)

    count := int32(0)
    ticker := time.NewTicker(500 * time.Millisecond)
    defer ticker.Stop()

    for {
        select {
        case <-stream.Context().Done():
            // Клиент отключился или контекст отменён
            log.Printf("📡 Клиент отписался от %q", req.Topic)
            return stream.Context().Err()

        case <-ticker.C:
            event := &pb.Event{
                Id:        uuid.New().String(),
                Type:      req.Topic,
                Payload:   fmt.Sprintf("Событие #%d для %s", count+1, req.Topic),
                Timestamp: timestamppb.Now(),
            }

            // Отправляем событие клиенту
            if err := stream.Send(event); err != nil {
                if errors.Is(err, io.EOF) {
                    return nil
                }
                return err
            }

            log.Printf("  ✓ Отправлено событие: %s", event.Id[:8])
            count++

            if req.MaxEvents > 0 && count >= req.MaxEvents {
                log.Printf("📡 Достигнут лимит (%d)", req.MaxEvents)
                return nil
            }
        }
    }
}

// ==========================================
// 3. CLIENT STREAMING — BatchUpload
// ==========================================

func (s *EventServer) BatchUpload(
    stream pb.EventService_BatchUploadServer,
) error {
    var (
        totalEvents  int32
        successCount int32
        failedIDs    []string
    )

    log.Println("📥 Получение batch-загрузки...")

    for {
        event, err := stream.Recv()
        if err == io.EOF {
            // Клиент закончил отправку — отправляем ответ
            log.Printf("📥 Batch завершён: всего=%d успешно=%d ошибок=%d",
                totalEvents, successCount, len(failedIDs))

            return stream.SendAndClose(&pb.BatchUploadResponse{
                TotalEvents:  totalEvents,
                SuccessCount: successCount,
                FailedIds:    failedIDs,
            })
        }
        if err != nil {
            return fmt.Errorf("recv event: %w", err)
        }

        totalEvents++

        // Валидация
        if event.Type == "" {
            failedIDs = append(failedIDs, event.Id)
            log.Printf("  ❌ Событие %s: пустой тип", event.Id[:8])
            continue
        }

        // Сохраняем событие
        s.mu.Lock()
        s.events = append(s.events, event)
        s.mu.Unlock()

        successCount++
        log.Printf("  ✓ Событие %s: тип=%s", event.Id[:8], event.Type)
    }
}

// ==========================================
// 4. BIDIRECTIONAL STREAMING — Chat
// ==========================================

func (s *EventServer) Chat(
    stream pb.EventService_ChatServer,
) error {
    // Регистрируем пользователя
    user := fmt.Sprintf("user-%s", uuid.New().String()[:6])
    log.Printf("💬 %s подключился к чату", user)

    // Создаём канал для этого пользователя
    msgChan := make(chan *pb.ChatMessage, 10)

    s.mu.Lock()
    if s.chatRooms["general"] == nil {
        s.chatRooms["general"] = make(map[string]chan *pb.ChatMessage)
    }
    s.chatRooms["general"][user] = msgChan
    s.mu.Unlock()

    // Убираем при выходе
    defer func() {
        s.mu.Lock()
        delete(s.chatRooms["general"], user)
        s.mu.Unlock()
        close(msgChan)
        log.Printf("💬 %s отключился от чата", user)
    }()

    // Горутина: отправляем сообщения пользователю
    go func() {
        for msg := range msgChan {
            if err := stream.Send(msg); err != nil {
                return
            }
        }
    }()

    // Приветственное сообщение
    msgChan <- &pb.ChatMessage{
        User:      "System",
        Text:      fmt.Sprintf("%s присоединился", user),
        Timestamp: timestamppb.Now(),
    }

    // Читаем сообщения от пользователя
    for {
        msg, err := stream.Recv()
        if err == io.EOF {
            return nil
        }
        if err != nil {
            return err
        }

        msg.Timestamp = timestamppb.Now()
        log.Printf("💬 [%s] %s: %s", msg.User, msg.User, msg.Text)

        // Рассылаем всем
        s.mu.RLock()
        for _, ch := range s.chatRooms["general"] {
            select {
            case ch <- msg:
            default:
                // Буфер переполнен — пропускаем
            }
        }
        s.mu.RUnlock()
    }
}

// ╔══════════════════════════════════════════════════════════╗
// ║  MAIN                                                    ║
// ╚══════════════════════════════════════════════════════════╝

func main() {
    log.SetFlags(log.LstdFlags | log.Lmicroseconds)
    log.Println("🚀 gRPC сервер со стримингом...")

    lis, err := net.Listen("tcp", ":50052")
    if err != nil {
        log.Fatalf("Порт: %v", err)
    }

    // Создаём сервер с интерцепторами
    grpcServer := grpc.NewServer(
        grpc.UnaryInterceptor(UnaryLoggingInterceptor),
        grpc.StreamInterceptor(StreamLoggingInterceptor),
    )

    // Регистрируем сервис
    pb.RegisterEventServiceServer(grpcServer, NewEventServer())

    // Включаем reflection (для grpcurl и отладки)
    reflection.Register(grpcServer)

    log.Println("✅ Сервер на :50052")
    log.Println("Методы:")
    log.Println("  Unary:           Ping")
    log.Println("  Server Stream:   Subscribe")
    log.Println("  Client Stream:   BatchUpload")
    log.Println("  Bidi Stream:     Chat")

    go func() {
        sigCh := make(chan os.Signal, 1)
        signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
        <-sigCh
        log.Println("🛑 Выключение...")
        grpcServer.GracefulStop()
    }()

    if err := grpcServer.Serve(lis); err != nil {
        log.Fatalf("Сервер: %v", err)
    }
}

import "errors"

💻 Файл: client/main.go

package main

import (
    "context"
    "fmt"
    "io"
    "log"
    "time"

    "google.golang.org/grpc"
    "google.golang.org/grpc/codes"
    "google.golang.org/grpc/credentials/insecure"
    "google.golang.org/grpc/metadata"
    "google.golang.org/grpc/status"
    "google.golang.org/protobuf/types/known/timestamppb"
    "github.com/google/uuid"

    pb "go-grpc-advanced/gen/events"
)

func main() {
    log.SetFlags(log.LstdFlags | log.Lmicroseconds)
    log.Println("🔌 Клиент gRPC (стриминг)")

    conn, err := grpc.Dial(
        "localhost:50052",
        grpc.WithTransportCredentials(insecure.NewCredentials()),
        grpc.WithBlock(),
        grpc.WithTimeout(5*time.Second),
    )
    if err != nil {
        log.Fatalf("Подключение: %v", err)
    }
    defer conn.Close()

    client := pb.NewEventServiceClient(conn)
    ctx := context.Background()

    // ==========================================
    // 1. UNARY PING (с метаданными и дедлайном)
    // ==========================================
    log.Println("\n── 1. UNARY PING ──")

    // Добавляем метаданные
    md := metadata.Pairs("x-request-id", uuid.New().String())
    mdCtx := metadata.NewOutgoingContext(ctx, md)

    // Устанавливаем дедлайн
    deadlineCtx, cancel := context.WithTimeout(mdCtx, 2*time.Second)
    defer cancel()

    pingResp, err := client.Ping(deadlineCtx, &pb.PingRequest{
        Message: "Hello from client!",
    })
    if err != nil {
        st, _ := status.FromError(err)
        log.Printf("  ❌ Ошибка: код=%s, %s", st.Code(), st.Message())
    } else {
        log.Printf("%s (серверное время: %s)",
            pingResp.Message,
            pingResp.ServerTime.AsTime().Format(time.RFC3339))
    }

    // ==========================================
    // 2. SERVER STREAMING — Subscribe
    // ==========================================
    log.Println("\n── 2. SERVER STREAMING ──")

    subCtx, subCancel := context.WithCancel(ctx)
    defer subCancel()

    stream, err := client.Subscribe(subCtx, &pb.SubscribeRequest{
        Topic:     "system.alerts",
        MaxEvents: 5,
    })
    if err != nil {
        log.Fatalf("Подписка: %v", err)
    }

    // Читаем события из потока
    log.Println("  Ожидание событий...")
    for {
        event, err := stream.Recv()
        if err == io.EOF {
            log.Println("  ✓ Поток завершён (сервер закрыл)")
            break
        }
        if err != nil {
            log.Printf("  ❌ Ошибка: %v", err)
            break
        }
        log.Printf("  📩 [%s] %s: %s",
            event.Id[:8], event.Type, event.Payload)
    }

    // ==========================================
    // 3. CLIENT STREAMING — BatchUpload
    // ==========================================
    log.Println("\n── 3. CLIENT STREAMING ──")

    uploadStream, err := client.BatchUpload(ctx)
    if err != nil {
        log.Fatalf("Batch upload: %v", err)
    }

    // Отправляем несколько событий
    eventsToSend := []struct {
        eventType string
        payload   string
    }{
        {"order.created", "Заказ #1"},
        {"order.created", "Заказ #2"},
        {"", "Невалидное событие"}, // Пустой тип
        {"payment.received", "Оплата #3"},
    }

    for _, e := range eventsToSend {
        event := &pb.Event{
            Id:        uuid.New().String(),
            Type:      e.eventType,
            Payload:   e.payload,
            Timestamp: timestamppb.Now(),
        }
        if err := uploadStream.Send(event); err != nil {
            log.Printf("  ❌ Ошибка отправки: %v", err)
            break
        }
        log.Printf("  ↑ Отправлено: [%s] %s", event.Id[:8], event.Type)
        time.Sleep(200 * time.Millisecond)
    }

    // Закрываем поток и получаем ответ
    batchResp, err := uploadStream.CloseAndRecv()
    if err != nil {
        log.Printf("  ❌ Ошибка: %v", err)
    } else {
        log.Printf("  ✓ Batch завершён: всего=%d, успешно=%d, ошибок=%d",
            batchResp.TotalEvents, batchResp.SuccessCount, len(batchResp.FailedIds))
    }

    // ==========================================
    // 4. BIDIRECTIONAL STREAMING — Chat
    // ==========================================
    log.Println("\n── 4. BIDIRECTIONAL STREAMING ──")

    chatCtx, chatCancel := context.WithTimeout(ctx, 5*time.Second)
    defer chatCancel()

    chatStream, err := client.Chat(chatCtx)
    if err != nil {
        log.Fatalf("Chat: %v", err)
    }

    // Получаем сообщения в горутине
    go func() {
        for {
            msg, err := chatStream.Recv()
            if err == io.EOF {
                return
            }
            if err != nil {
                if st, _ := status.FromError(err); st.Code() == codes.DeadlineExceeded {
                    return
                }
                log.Printf("  ❌ Recv: %v", err)
                return
            }
            log.Printf("  💬 [%s] %s: %s",
                msg.Timestamp.AsTime().Format("15:04:05"),
                msg.User, msg.Text)
        }
    }()

    // Отправляем сообщения
    messages := []string{
        "Привет всем!",
        "Как дела?",
        "Изучаю gRPC стриминг",
    }

    for _, text := range messages {
        if err := chatStream.Send(&pb.ChatMessage{
            User: "Go-Client",
            Text: text,
        }); err != nil {
            log.Printf("  ❌ Send: %v", err)
            break
        }
        time.Sleep(500 * time.Millisecond)
    }

    // Даём время на получение ответов
    time.Sleep(1 * time.Second)
    chatStream.CloseSend()

    log.Println("\n✅ Клиент завершил работу")
}

🚀 Запуск

# Терминал 1: Сервер
go run ./server/main.go

# Терминал 2: Клиент
go run ./client/main.go

# Отладка через grpcurl (если установлен)
grpcurl -plaintext localhost:50052 list
grpcurl -plaintext localhost:50052 events.EventService/Ping

📊 Типы стриминга в gRPC

ТипКлиентСерверАналогИспользование
Unary1 запрос1 ответHTTP RESTCRUD, запросы
Server Streaming1 запросN ответовSSE / chunkedЛоги, метрики, подписки
Client StreamingN запросов1 ответUpload chunkedЗагрузка файлов, batch
BidirectionalN запросовN ответовWebSocketЧат, real-time игры

📊 gRPC Status Codes

КодКонстантаКогда использовать
0OKУспех
3InvalidArgumentНевалидные данные
4DeadlineExceededТаймаут
5NotFoundРесурс не найден
6AlreadyExistsДубликат
7PermissionDeniedНет доступа
13InternalВнутренняя ошибка
14UnavailableСервис недоступен (для retry)

🚀 Запуск программы

# Генерация кода
protoc --proto_path=proto \
  --go_out=gen/events --go_opt=paths=source_relative \
  --go-grpc_out=gen/events --go-grpc_opt=paths=source_relative \
  proto/events/*.proto

# Сервер
go run ./server/main.go

# Клиент (в другом терминале)
go run ./client/main.go
⚠️ Типичные ошибки:
💡 Практический совет:

💡 Best practices от сеньоров:

🔑 Ключевые концепции

Что нужно запомнить из этого урока:

💡 Для Node.js разработчика:

← Предыдущий урок Следующий урок →