Server streaming → response.write() + response.end() в Node.js
Client streaming → request.on('data') + request.on('end')
Bidi streaming → WebSocket (но типизированный!)
Interceptors → middleware в Express/gRPC
Deadline → context.WithTimeout (урок 9)
Retry → автоматически встроен в gRPC (настройка через ServiceConfig)
mkdir go-grpc-advanced && cd go-grpc-advanced
go mod init go-grpc-advanced
go get google.golang.org/grpc
go get google.golang.org/protobuf
go get github.com/google/uuid
go get google.golang.org/grpc/reflection
go mod tidy
# Создаём структуру
mkdir -p proto/events
mkdir -p gen/events
mkdir -p server
mkdir -p client
proto/events/events.protosyntax = "proto3";
package events;
option go_package = "go-grpc-advanced/gen/events;eventspb";
import "google/protobuf/timestamp.proto";
// ╔══════════════════════════════════════════════════════════╗
// ║ СЕРВИС С РАЗНЫМИ ТИПАМИ СТРИМИНГА ║
// ╚══════════════════════════════════════════════════════════╝
service EventService {
// Unary — обычный запрос-ответ
rpc Ping(PingRequest) returns (PingResponse);
// Server streaming — сервер шлёт поток ответов
// Клиент отправляет ОДИН запрос, получает МНОГО ответов
rpc Subscribe(SubscribeRequest) returns (stream Event);
// Client streaming — клиент шлёт поток запросов
// Клиент отправляет МНОГО запросов, получает ОДИН ответ
rpc BatchUpload(stream Event) returns (BatchUploadResponse);
// Bidirectional streaming — оба шлют потоки
rpc Chat(stream ChatMessage) returns (stream ChatMessage);
}
// ==========================================
// Сообщения
// ==========================================
message PingRequest {
string message = 1;
}
message PingResponse {
string message = 1;
google.protobuf.Timestamp server_time = 2;
}
message Event {
string id = 1;
string type = 2;
string payload = 3;
google.protobuf.Timestamp timestamp = 4;
}
message SubscribeRequest {
string topic = 1;
int32 max_events = 2; // 0 = бесконечно
}
message BatchUploadResponse {
int32 total_events = 1;
int32 success_count = 2;
repeated string failed_ids = 3;
}
message ChatMessage {
string user = 1;
string text = 2;
google.protobuf.Timestamp timestamp = 3;
}
mkdir -p gen/events
protoc \
--proto_path=proto \
--go_out=gen/events --go_opt=paths=source_relative \
--go-grpc_out=gen/events --go-grpc_opt=paths=source_relative \
proto/events/*.proto
server/main.gopackage main
import (
"context"
"fmt"
"io"
"log"
"net"
"os"
"os/signal"
"sync"
"syscall"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/reflection"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/timestamppb"
"github.com/google/uuid"
pb "go-grpc-advanced/gen/events"
)
// ╔══════════════════════════════════════════════════════════╗
// ║ ИНТЕРЦЕПТОРЫ ║
// ╚══════════════════════════════════════════════════════════╝
// UnaryInterceptor — логирует все unary-запросы
func UnaryLoggingInterceptor(
ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (interface{}, error) {
start := time.Now()
// Извлекаем метаданные
md, _ := metadata.FromIncomingContext(ctx)
requestID := "unknown"
if ids := md.Get("x-request-id"); len(ids) > 0 {
requestID = ids[0]
}
log.Printf("[%s] → %s", requestID, info.FullMethod)
// Вызываем реальный обработчик
resp, err := handler(ctx, req)
st, _ := status.FromError(err)
log.Printf("[%s] ← %s (код=%s, время=%v)",
requestID, info.FullMethod, st.Code(), time.Since(start))
return resp, err
}
// StreamInterceptor — логирует стримы
func StreamLoggingInterceptor(
srv interface{},
ss grpc.ServerStream,
info *grpc.StreamServerInfo,
handler grpc.StreamHandler,
) error {
start := time.Now()
log.Printf("[stream] → %s", info.FullMethod)
err := handler(srv, ss)
st, _ := status.FromError(err)
log.Printf("[stream] ← %s (код=%s, время=%v)",
info.FullMethod, st.Code(), time.Since(start))
return err
}
// ╔══════════════════════════════════════════════════════════╗
// ║ РЕАЛИЗАЦИЯ СЕРВЕРА ║
// ╚══════════════════════════════════════════════════════════╝
type EventServer struct {
pb.UnimplementedEventServiceServer
mu sync.RWMutex
events []*pb.Event
chatRooms map[string]map[string]chan *pb.ChatMessage // topic → user → chan
}
func NewEventServer() *EventServer {
return &EventServer{
events: make([]*pb.Event, 0),
chatRooms: make(map[string]map[string]chan *pb.ChatMessage),
}
}
// ==========================================
// 1. UNARY — Ping
// ==========================================
func (s *EventServer) Ping(
ctx context.Context,
req *pb.PingRequest,
) (*pb.PingResponse, error) {
// Проверяем дедлайн
if deadline, ok := ctx.Deadline(); ok {
log.Printf(" Дедлайн запроса: %v (осталось %v)",
deadline, time.Until(deadline))
}
return &pb.PingResponse{
Message: fmt.Sprintf("Pong: %s", req.Message),
ServerTime: timestamppb.Now(),
}, nil
}
// ==========================================
// 2. SERVER STREAMING — Subscribe
// ==========================================
func (s *EventServer) Subscribe(
req *pb.SubscribeRequest,
stream pb.EventService_SubscribeServer,
) error {
log.Printf("📡 Клиент подписался на тему %q (макс=%d)", req.Topic, req.MaxEvents)
count := int32(0)
ticker := time.NewTicker(500 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-stream.Context().Done():
// Клиент отключился или контекст отменён
log.Printf("📡 Клиент отписался от %q", req.Topic)
return stream.Context().Err()
case <-ticker.C:
event := &pb.Event{
Id: uuid.New().String(),
Type: req.Topic,
Payload: fmt.Sprintf("Событие #%d для %s", count+1, req.Topic),
Timestamp: timestamppb.Now(),
}
// Отправляем событие клиенту
if err := stream.Send(event); err != nil {
if errors.Is(err, io.EOF) {
return nil
}
return err
}
log.Printf(" ✓ Отправлено событие: %s", event.Id[:8])
count++
if req.MaxEvents > 0 && count >= req.MaxEvents {
log.Printf("📡 Достигнут лимит (%d)", req.MaxEvents)
return nil
}
}
}
}
// ==========================================
// 3. CLIENT STREAMING — BatchUpload
// ==========================================
func (s *EventServer) BatchUpload(
stream pb.EventService_BatchUploadServer,
) error {
var (
totalEvents int32
successCount int32
failedIDs []string
)
log.Println("📥 Получение batch-загрузки...")
for {
event, err := stream.Recv()
if err == io.EOF {
// Клиент закончил отправку — отправляем ответ
log.Printf("📥 Batch завершён: всего=%d успешно=%d ошибок=%d",
totalEvents, successCount, len(failedIDs))
return stream.SendAndClose(&pb.BatchUploadResponse{
TotalEvents: totalEvents,
SuccessCount: successCount,
FailedIds: failedIDs,
})
}
if err != nil {
return fmt.Errorf("recv event: %w", err)
}
totalEvents++
// Валидация
if event.Type == "" {
failedIDs = append(failedIDs, event.Id)
log.Printf(" ❌ Событие %s: пустой тип", event.Id[:8])
continue
}
// Сохраняем событие
s.mu.Lock()
s.events = append(s.events, event)
s.mu.Unlock()
successCount++
log.Printf(" ✓ Событие %s: тип=%s", event.Id[:8], event.Type)
}
}
// ==========================================
// 4. BIDIRECTIONAL STREAMING — Chat
// ==========================================
func (s *EventServer) Chat(
stream pb.EventService_ChatServer,
) error {
// Регистрируем пользователя
user := fmt.Sprintf("user-%s", uuid.New().String()[:6])
log.Printf("💬 %s подключился к чату", user)
// Создаём канал для этого пользователя
msgChan := make(chan *pb.ChatMessage, 10)
s.mu.Lock()
if s.chatRooms["general"] == nil {
s.chatRooms["general"] = make(map[string]chan *pb.ChatMessage)
}
s.chatRooms["general"][user] = msgChan
s.mu.Unlock()
// Убираем при выходе
defer func() {
s.mu.Lock()
delete(s.chatRooms["general"], user)
s.mu.Unlock()
close(msgChan)
log.Printf("💬 %s отключился от чата", user)
}()
// Горутина: отправляем сообщения пользователю
go func() {
for msg := range msgChan {
if err := stream.Send(msg); err != nil {
return
}
}
}()
// Приветственное сообщение
msgChan <- &pb.ChatMessage{
User: "System",
Text: fmt.Sprintf("%s присоединился", user),
Timestamp: timestamppb.Now(),
}
// Читаем сообщения от пользователя
for {
msg, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}
msg.Timestamp = timestamppb.Now()
log.Printf("💬 [%s] %s: %s", msg.User, msg.User, msg.Text)
// Рассылаем всем
s.mu.RLock()
for _, ch := range s.chatRooms["general"] {
select {
case ch <- msg:
default:
// Буфер переполнен — пропускаем
}
}
s.mu.RUnlock()
}
}
// ╔══════════════════════════════════════════════════════════╗
// ║ MAIN ║
// ╚══════════════════════════════════════════════════════════╝
func main() {
log.SetFlags(log.LstdFlags | log.Lmicroseconds)
log.Println("🚀 gRPC сервер со стримингом...")
lis, err := net.Listen("tcp", ":50052")
if err != nil {
log.Fatalf("Порт: %v", err)
}
// Создаём сервер с интерцепторами
grpcServer := grpc.NewServer(
grpc.UnaryInterceptor(UnaryLoggingInterceptor),
grpc.StreamInterceptor(StreamLoggingInterceptor),
)
// Регистрируем сервис
pb.RegisterEventServiceServer(grpcServer, NewEventServer())
// Включаем reflection (для grpcurl и отладки)
reflection.Register(grpcServer)
log.Println("✅ Сервер на :50052")
log.Println("Методы:")
log.Println(" Unary: Ping")
log.Println(" Server Stream: Subscribe")
log.Println(" Client Stream: BatchUpload")
log.Println(" Bidi Stream: Chat")
go func() {
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
<-sigCh
log.Println("🛑 Выключение...")
grpcServer.GracefulStop()
}()
if err := grpcServer.Serve(lis); err != nil {
log.Fatalf("Сервер: %v", err)
}
}
import "errors"
client/main.gopackage main
import (
"context"
"fmt"
"io"
"log"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/timestamppb"
"github.com/google/uuid"
pb "go-grpc-advanced/gen/events"
)
func main() {
log.SetFlags(log.LstdFlags | log.Lmicroseconds)
log.Println("🔌 Клиент gRPC (стриминг)")
conn, err := grpc.Dial(
"localhost:50052",
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithBlock(),
grpc.WithTimeout(5*time.Second),
)
if err != nil {
log.Fatalf("Подключение: %v", err)
}
defer conn.Close()
client := pb.NewEventServiceClient(conn)
ctx := context.Background()
// ==========================================
// 1. UNARY PING (с метаданными и дедлайном)
// ==========================================
log.Println("\n── 1. UNARY PING ──")
// Добавляем метаданные
md := metadata.Pairs("x-request-id", uuid.New().String())
mdCtx := metadata.NewOutgoingContext(ctx, md)
// Устанавливаем дедлайн
deadlineCtx, cancel := context.WithTimeout(mdCtx, 2*time.Second)
defer cancel()
pingResp, err := client.Ping(deadlineCtx, &pb.PingRequest{
Message: "Hello from client!",
})
if err != nil {
st, _ := status.FromError(err)
log.Printf(" ❌ Ошибка: код=%s, %s", st.Code(), st.Message())
} else {
log.Printf(" ✓ %s (серверное время: %s)",
pingResp.Message,
pingResp.ServerTime.AsTime().Format(time.RFC3339))
}
// ==========================================
// 2. SERVER STREAMING — Subscribe
// ==========================================
log.Println("\n── 2. SERVER STREAMING ──")
subCtx, subCancel := context.WithCancel(ctx)
defer subCancel()
stream, err := client.Subscribe(subCtx, &pb.SubscribeRequest{
Topic: "system.alerts",
MaxEvents: 5,
})
if err != nil {
log.Fatalf("Подписка: %v", err)
}
// Читаем события из потока
log.Println(" Ожидание событий...")
for {
event, err := stream.Recv()
if err == io.EOF {
log.Println(" ✓ Поток завершён (сервер закрыл)")
break
}
if err != nil {
log.Printf(" ❌ Ошибка: %v", err)
break
}
log.Printf(" 📩 [%s] %s: %s",
event.Id[:8], event.Type, event.Payload)
}
// ==========================================
// 3. CLIENT STREAMING — BatchUpload
// ==========================================
log.Println("\n── 3. CLIENT STREAMING ──")
uploadStream, err := client.BatchUpload(ctx)
if err != nil {
log.Fatalf("Batch upload: %v", err)
}
// Отправляем несколько событий
eventsToSend := []struct {
eventType string
payload string
}{
{"order.created", "Заказ #1"},
{"order.created", "Заказ #2"},
{"", "Невалидное событие"}, // Пустой тип
{"payment.received", "Оплата #3"},
}
for _, e := range eventsToSend {
event := &pb.Event{
Id: uuid.New().String(),
Type: e.eventType,
Payload: e.payload,
Timestamp: timestamppb.Now(),
}
if err := uploadStream.Send(event); err != nil {
log.Printf(" ❌ Ошибка отправки: %v", err)
break
}
log.Printf(" ↑ Отправлено: [%s] %s", event.Id[:8], event.Type)
time.Sleep(200 * time.Millisecond)
}
// Закрываем поток и получаем ответ
batchResp, err := uploadStream.CloseAndRecv()
if err != nil {
log.Printf(" ❌ Ошибка: %v", err)
} else {
log.Printf(" ✓ Batch завершён: всего=%d, успешно=%d, ошибок=%d",
batchResp.TotalEvents, batchResp.SuccessCount, len(batchResp.FailedIds))
}
// ==========================================
// 4. BIDIRECTIONAL STREAMING — Chat
// ==========================================
log.Println("\n── 4. BIDIRECTIONAL STREAMING ──")
chatCtx, chatCancel := context.WithTimeout(ctx, 5*time.Second)
defer chatCancel()
chatStream, err := client.Chat(chatCtx)
if err != nil {
log.Fatalf("Chat: %v", err)
}
// Получаем сообщения в горутине
go func() {
for {
msg, err := chatStream.Recv()
if err == io.EOF {
return
}
if err != nil {
if st, _ := status.FromError(err); st.Code() == codes.DeadlineExceeded {
return
}
log.Printf(" ❌ Recv: %v", err)
return
}
log.Printf(" 💬 [%s] %s: %s",
msg.Timestamp.AsTime().Format("15:04:05"),
msg.User, msg.Text)
}
}()
// Отправляем сообщения
messages := []string{
"Привет всем!",
"Как дела?",
"Изучаю gRPC стриминг",
}
for _, text := range messages {
if err := chatStream.Send(&pb.ChatMessage{
User: "Go-Client",
Text: text,
}); err != nil {
log.Printf(" ❌ Send: %v", err)
break
}
time.Sleep(500 * time.Millisecond)
}
// Даём время на получение ответов
time.Sleep(1 * time.Second)
chatStream.CloseSend()
log.Println("\n✅ Клиент завершил работу")
}
# Терминал 1: Сервер
go run ./server/main.go
# Терминал 2: Клиент
go run ./client/main.go
# Отладка через grpcurl (если установлен)
grpcurl -plaintext localhost:50052 list
grpcurl -plaintext localhost:50052 events.EventService/Ping
| Тип | Клиент | Сервер | Аналог | Использование |
|---|---|---|---|---|
| Unary | 1 запрос | 1 ответ | HTTP REST | CRUD, запросы |
| Server Streaming | 1 запрос | N ответов | SSE / chunked | Логи, метрики, подписки |
| Client Streaming | N запросов | 1 ответ | Upload chunked | Загрузка файлов, batch |
| Bidirectional | N запросов | N ответов | WebSocket | Чат, real-time игры |
| Код | Константа | Когда использовать |
|---|---|---|
| 0 | OK | Успех |
| 3 | InvalidArgument | Невалидные данные |
| 4 | DeadlineExceeded | Таймаут |
| 5 | NotFound | Ресурс не найден |
| 6 | AlreadyExists | Дубликат |
| 7 | PermissionDenied | Нет доступа |
| 13 | Internal | Внутренняя ошибка |
| 14 | Unavailable | Сервис недоступен (для retry) |
# Генерация кода
protoc --proto_path=proto \
--go_out=gen/events --go_opt=paths=source_relative \
--go-grpc_out=gen/events --go-grpc_opt=paths=source_relative \
proto/events/*.proto
# Сервер
go run ./server/main.go
# Клиент (в другом терминале)
go run ./client/main.go
stream.Context().Done() для отмены.
💡 Best practices от сеньоров:
stream.Send() в цикле. Клиент читает через stream.Recv() до io.EOF.
stream.Send() в цикле, затем CloseAndRecv() для ответа.Send() и Recv() одновременно в горутинах.context.WithTimeout перед вызовом RPC.💡 Для Node.js разработчика:
on('data') → Recv(), write() → Send().
new Date(Date.now() + 5000). В Go: context.WithTimeout(ctx, 5s).