Урок 16: Redis — кэширование, Pub/Sub, распределённые блокировки

Урок 16. Redis — кэширование, Pub/Sub, распределённые блокировки

🔄 Node.js → Go (ключевые аналоги):

📋 Что изучаем

📦 Инициализация проекта

mkdir go-redis && cd go-redis
go mod init go-redis

# Устанавливаем go-redis
go get github.com/redis/go-redis/v9
go get github.com/google/uuid
go mod tidy

# Запускаем Redis
docker run -d --name redis-lesson16 \
  -p 6379:6379 \
  redis:7-alpine

💻 Код программы

package main

import (
    "context"
    "encoding/json"
    "errors"
    "fmt"
    "log"
    "sync"
    "time"

    "github.com/google/uuid"
    "github.com/redis/go-redis/v9"
)

// ╔══════════════════════════════════════════════════════════╗
// ║  1. МОДЕЛИ ДАННЫХ                                      ║
// ╚══════════════════════════════════════════════════════════╝

type User struct {
    ID        string    `json:"id"`
    Name      string    `json:"name"`
    Email     string    `json:"email"`
    Plan      string    `json:"plan"`
    UpdatedAt time.Time `json:"updated_at"`
}

// ╔══════════════════════════════════════════════════════════╗
// ║  2. КЭШ-СЕРВИС (Cache-Aside паттерн)                   ║
// ╚══════════════════════════════════════════════════════════╝

// CacheService — сервис кэширования с Redis
type CacheService struct {
    client *redis.Client
}

// NewCacheService — конструктор
func NewCacheService(client *redis.Client) *CacheService {
    return &CacheService{client: client}
}

// GetUser — получает пользователя (сначала кэш, потом "БД")
func (s *CacheService) GetUser(ctx context.Context, userID string) (*User, error) {
    cacheKey := fmt.Sprintf("user:%s", userID)

    // 1. Пытаемся получить из кэша
    data, err := s.client.Get(ctx, cacheKey).Bytes()
    if err == nil {
        // КЭШ ПОПАДАНИЕ (cache hit)
        var user User
        if err := json.Unmarshal(data, &user); err != nil {
            return nil, fmt.Errorf("unmarshal cached user: %w", err)
        }
        log.Printf("  [CACHE HIT] user:%s", userID)
        return &user, nil
    }

    // redis.Nil — ключ не существует
    if !errors.Is(err, redis.Nil) {
        // Реальная ошибка Redis
        return nil, fmt.Errorf("redis get: %w", err)
    }

    // 2. КЭШ ПРОМАХ (cache miss) — "идём в БД"
    log.Printf("  [CACHE MISS] user:%s — запрашиваем БД...", userID)
    user, err := s.fetchUserFromDB(ctx, userID)
    if err != nil {
        return nil, err
    }

    // 3. Сохраняем в кэш (в фоне, не блокируем ответ)
    go s.cacheUser(context.Background(), user)

    return user, nil
}

// cacheUser — сохраняет пользователя в кэш
func (s *CacheService) cacheUser(ctx context.Context, user *User) {
    cacheKey := fmt.Sprintf("user:%s", user.ID)
    data, _ := json.Marshal(user)

    // Set с TTL 5 минут
    if err := s.client.Set(ctx, cacheKey, data, 5*time.Minute).Err(); err != nil {
        log.Printf("  Ошибка кэширования: %v", err)
    }
}

// InvalidateUser — инвалидация кэша (при обновлении пользователя)
func (s *CacheService) InvalidateUser(ctx context.Context, userID string) error {
    cacheKey := fmt.Sprintf("user:%s", userID)
    return s.client.Del(ctx, cacheKey).Err()
}

// InvalidatePattern — инвалидация по паттерну (осторожно в продакшене!)
func (s *CacheService) InvalidatePattern(ctx context.Context, pattern string) error {
    var cursor uint64
    for {
        keys, nextCursor, err := s.client.Scan(ctx, cursor, pattern, 100).Result()
        if err != nil {
            return fmt.Errorf("scan: %w", err)
        }
        if len(keys) > 0 {
            if err := s.client.Del(ctx, keys...).Err(); err != nil {
                return fmt.Errorf("del: %w", err)
            }
        }
        cursor = nextCursor
        if cursor == 0 {
            break
        }
    }
    return nil
}

// fetchUserFromDB — имитация запроса к БД
func (s *CacheService) fetchUserFromDB(ctx context.Context, userID string) (*User, error) {
    // Имитация задержки БД
    time.Sleep(100 * time.Millisecond)

    return &User{
        ID:        userID,
        Name:      fmt.Sprintf("User-%s", userID),
        Email:     fmt.Sprintf("user%s@example.com", userID),
        Plan:      "premium",
        UpdatedAt: time.Now(),
    }, nil
}

// ╔══════════════════════════════════════════════════════════╗
// ║  3. PIPELINE (batch-операции)                          ║
// ╚══════════════════════════════════════════════════════════╝

func (s *CacheService) GetUsersBatch(ctx context.Context, userIDs []string) ([]*User, error) {
    // Pipeline — отправляет команды одним сетевым пакетом
    pipe := s.client.Pipeline()

    // Создаём команды Get для каждого ID
    cmds := make([]*redis.StringCmd, len(userIDs))
    for i, id := range userIDs {
        cmds[i] = pipe.Get(ctx, fmt.Sprintf("user:%s", id))
    }

    // Выполняем все команды разом
    if _, err := pipe.Exec(ctx); err != nil && !errors.Is(err, redis.Nil) {
        return nil, fmt.Errorf("pipeline exec: %w", err)
    }

    // Собираем результаты
    var users []*User
    for i, cmd := range cmds {
        data, err := cmd.Bytes()
        if err != nil {
            if errors.Is(err, redis.Nil) {
                // cache miss — можно сходить в БД
                continue
            }
            return nil, fmt.Errorf("pipeline result: %w", err)
        }
        var user User
        json.Unmarshal(data, &user)
        users = append(users, &user)
        _ = i
    }

    return users, nil
}

// ╔══════════════════════════════════════════════════════════╗
// ║  4. PUB/SUB                                             ║
// ╚══════════════════════════════════════════════════════════╝

// PubSubService — сервис публикации/подписки
type PubSubService struct {
    client *redis.Client
}

func NewPubSubService(client *redis.Client) *PubSubService {
    return &PubSubService{client: client}
}

// Publish — публикует сообщение в канал
func (ps *PubSubService) Publish(ctx context.Context, channel, message string) error {
    return ps.client.Publish(ctx, channel, message).Err()
}

// Subscribe — подписывается на канал и обрабатывает сообщения
func (ps *PubSubService) Subscribe(ctx context.Context, channel string, handler func(msg string)) {
    pubsub := ps.client.Subscribe(ctx, channel)
    defer pubsub.Close()

    // Ждём подтверждения подписки
    if _, err := pubsub.Receive(ctx); err != nil {
        log.Printf("  Ошибка подписки на %s: %v", channel, err)
        return
    }

    log.Printf("  ✓ Подписаны на канал %q", channel)

    // Канал для сообщений
    ch := pubsub.Channel()

    for {
        select {
        case msg, ok := <-ch:
            if !ok {
                return // Канал закрыт
            }
            handler(msg.Payload)
        case <-ctx.Done():
            return
        }
    }
}

// ╔══════════════════════════════════════════════════════════╗
// ║  5. РАСПРЕДЕЛЁННАЯ БЛОКИРОВКА (Distributed Lock)     ║
// ╚══════════════════════════════════════════════════════════╝

// DistributedLock — простая распределённая блокировка на Redis
type DistributedLock struct {
    client *redis.Client
    key    string
    value  string // Уникальный ID владельца блокировки
    ttl    time.Duration
}

// NewDistributedLock — конструктор
func NewDistributedLock(client *redis.Client, key string, ttl time.Duration) *DistributedLock {
    return &DistributedLock{
        client: client,
        key:    fmt.Sprintf("lock:%s", key),
        value:  uuid.New().String(), // Уникальный ID для безопасного снятия
        ttl:    ttl,
    }
}

// Acquire — захватывает блокировку
func (l *DistributedLock) Acquire(ctx context.Context) (bool, error) {
    // SET key value NX EX ttl
    // NX — установить только если ключ НЕ существует
    // EX — время жизни в секундах
    ok, err := l.client.SetNX(ctx, l.key, l.value, l.ttl).Result()
    if err != nil {
        return false, fmt.Errorf("acquire lock: %w", err)
    }
    return ok, nil
}

// Release — освобождает блокировку (только если мы владелец!)
func (l *DistributedLock) Release(ctx context.Context) error {
    // Lua-скрипт для атомарной проверки владельца и удаления
    script := `
        if redis.call("GET", KEYS[1]) == ARGV[1] then
            return redis.call("DEL", KEYS[1])
        else
            return 0
        end
    `
    result, err := l.client.Eval(ctx, script, []string{l.key}, l.value).Result()
    if err != nil {
        return fmt.Errorf("release lock: %w", err)
    }
    if result.(int64) == 0 {
        return errors.New("lock lost (expired or owned by another)")
    }
    return nil
}

// Extend — продлевает блокировку
func (l *DistributedLock) Extend(ctx context.Context) error {
    script := `
        if redis.call("GET", KEYS[1]) == ARGV[1] then
            return redis.call("EXPIRE", KEYS[1], ARGV[2])
        else
            return 0
        end
    `
    result, err := l.client.Eval(ctx, script, []string{l.key}, l.value, int(l.ttl.Seconds())).Result()
    if err != nil {
        return fmt.Errorf("extend lock: %w", err)
    }
    if result.(int64) == 0 {
        return errors.New("cannot extend: not lock owner")
    }
    return nil
}

// ╔══════════════════════════════════════════════════════════╗
// ║  6. REDIS STREAMS (основы)                             ║
// ╚══════════════════════════════════════════════════════════╝

// StreamService — работа с Redis Streams
type StreamService struct {
    client *redis.Client
}

func NewStreamService(client *redis.Client) *StreamService {
    return &StreamService{client: client}
}

// AddEvent — добавляет событие в стрим
func (s *StreamService) AddEvent(ctx context.Context, stream string, event map[string]interface{}) (string, error) {
    id, err := s.client.XAdd(ctx, &redis.XAddArgs{
        Stream: stream,
        Values: event,
    }).Result()
    if err != nil {
        return "", fmt.Errorf("xadd: %w", err)
    }
    return id, nil
}

// ReadEvents — читает события из стрима (неблокирующее)
func (s *StreamService) ReadEvents(ctx context.Context, stream, lastID string, count int64) ([]redis.XMessage, error) {
    result, err := s.client.XRead(ctx, &redis.XReadArgs{
        Streams: []string{stream, lastID},
        Count:   count,
        Block:   0, // 0 = неблокирующее
    }).Result()
    if err != nil {
        if errors.Is(err, redis.Nil) {
            return nil, nil // Нет новых сообщений
        }
        return nil, fmt.Errorf("xread: %w", err)
    }

    if len(result) > 0 {
        return result[0].Messages, nil
    }
    return nil, nil
}

// CreateConsumerGroup — создаёт consumer group
func (s *StreamService) CreateConsumerGroup(ctx context.Context, stream, group string) error {
    err := s.client.XGroupCreateMkStream(ctx, stream, group, "0").Err()
    if err != nil && !strings.Contains(err.Error(), "BUSYGROUP") {
        return fmt.Errorf("xgroup create: %w", err)
    }
    return nil
}

// ╔══════════════════════════════════════════════════════════╗
// ║  MAIN                                                    ║
// ╚══════════════════════════════════════════════════════════╝

func main() {
    log.SetFlags(log.LstdFlags | log.Lmicroseconds)
    log.Println("🔴 Redis — кэширование, Pub/Sub, блокировки, стримы")

    ctx := context.Background()

    // Подключение к Redis
    client := redis.NewClient(&redis.Options{
        Addr:         "localhost:6379",
        Password:     "",
        DB:           0,
        PoolSize:     20,
        MinIdleConns: 5,
        MaxRetries:   3,
        DialTimeout:  5 * time.Second,
        ReadTimeout:  3 * time.Second,
        WriteTimeout: 3 * time.Second,
    })

    // Проверка соединения
    if err := client.Ping(ctx).Err(); err != nil {
        log.Fatalf("Redis подключение: %v", err)
    }
    log.Println("✅ Подключено к Redis")

    // Очищаем тестовые ключи
    defer client.FlushDB(ctx)

    // ==========================================
    // 1. КЭШИРОВАНИЕ (Cache-Aside)
    // ==========================================
    log.Println("\n── 1. КЭШИРОВАНИЕ ──")
    cache := NewCacheService(client)

    // Первый вызов — cache miss
    user1, _ := cache.GetUser(ctx, "42")
    log.Printf("  Пользователь: %s <%s>", user1.Name, user1.Email)

    // Второй вызов — cache hit
    user2, _ := cache.GetUser(ctx, "42")
    log.Printf("  Пользователь (из кэша): %s", user2.Name)

    // Инвалидация
    cache.InvalidateUser(ctx, "42")
    log.Println("  ✓ Кэш инвалидирован")

    // Третий вызов — снова cache miss
    cache.GetUser(ctx, "42")

    // ==========================================
    // 2. PIPELINE
    // ==========================================
    log.Println("\n── 2. PIPELINE ──")
    // Заранее кэшируем несколько пользователей
    for _, id := range []string{"1", "2", "3"} {
        cache.GetUser(ctx, id) // Прогреваем кэш
    }

    users, err := cache.GetUsersBatch(ctx, []string{"1", "2", "3", "99"})
    if err != nil {
        log.Printf("  Ошибка pipeline: %v", err)
    } else {
        log.Printf("  Получено пользователей: %d", len(users))
    }

    // ==========================================
    // 3. PUB/SUB
    // ==========================================
    log.Println("\n── 3. PUB/SUB ──")
    pubsub := NewPubSubService(client)

    // Создаём контекст с отменой для подписчика
    subCtx, subCancel := context.WithCancel(ctx)

    // Запускаем подписчика в горутине
    var wg sync.WaitGroup
    wg.Add(1)
    go func() {
        defer wg.Done()
        pubsub.Subscribe(subCtx, "notifications", func(msg string) {
            log.Printf("  📩 Получено сообщение: %s", msg)
        })
    }()

    // Даём время на подписку
    time.Sleep(100 * time.Millisecond)

    // Публикуем несколько сообщений
    for i := 1; i <= 3; i++ {
        msg := fmt.Sprintf("Уведомление #%d: %s", i, time.Now().Format("15:04:05"))
        pubsub.Publish(ctx, "notifications", msg)
        time.Sleep(200 * time.Millisecond)
    }

    // Отменяем подписку
    subCancel()
    wg.Wait()

    // ==========================================
    // 4. РАСПРЕДЕЛЁННАЯ БЛОКИРОВКА
    // ==========================================
    log.Println("\n── 4. РАСПРЕДЕЛЁННАЯ БЛОКИРОВКА ──")

    lock := NewDistributedLock(client, "resource:order:42", 10*time.Second)

    // Захватываем блокировку
    acquired, err := lock.Acquire(ctx)
    if err != nil {
        log.Printf("  Ошибка захвата: %v", err)
    } else if acquired {
        log.Println("  ✓ Блокировка захвачена")

        // Имитация работы под блокировкой
        time.Sleep(500 * time.Millisecond)

        // Продлеваем блокировку
        if err := lock.Extend(ctx); err != nil {
            log.Printf("  Ошибка продления: %v", err)
        } else {
            log.Println("  ✓ Блокировка продлена")
        }

        // Освобождаем
        if err := lock.Release(ctx); err != nil {
            log.Printf("  Ошибка освобождения: %v", err)
        } else {
            log.Println("  ✓ Блокировка освобождена")
        }
    } else {
        log.Println("  ❌ Блокировка занята другим процессом")
    }

    // Попытка повторного захвата (должна быть успешной)
    acquired2, _ := lock.Acquire(ctx)
    log.Printf("  Повторный захват: %v (ожидается true)", acquired2)
    lock.Release(ctx)

    // ==========================================
    // 5. REDIS STREAMS
    // ==========================================
    log.Println("\n── 5. REDIS STREAMS ──")
    streams := NewStreamService(client)

    // Добавляем события
    for i := 1; i <= 5; i++ {
        id, _ := streams.AddEvent(ctx, "orders", map[string]interface{}{
            "order_id": fmt.Sprintf("order-%d", i),
            "amount":   i * 100,
            "status":   "created",
        })
        log.Printf("  ✓ Событие добавлено: %s", id)
    }

    // Читаем все события с начала
    messages, err := streams.ReadEvents(ctx, "orders", "0", 10)
    if err != nil {
        log.Printf("  Ошибка чтения: %v", err)
    } else {
        log.Printf("  Прочитано событий: %d", len(messages))
        for _, msg := range messages {
            log.Printf("    %s: %v", msg.ID, msg.Values)
        }
    }

    // Читаем только новые (с последнего ID)
    if len(messages) > 0 {
        lastID := messages[len(messages)-1].ID
        newMessages, _ := streams.ReadEvents(ctx, "orders", lastID, 10)
        log.Printf("  Новых событий после %s: %d", lastID, len(newMessages))
    }

    log.Println("\n✅ Демонстрация завершена!")
}

// Добавить в импорты
import "strings"

⚠️ Важно: Добавьте "strings" в блок импортов в начале файла.

🧪 Тестирование

# Запускаем Redis
docker run -d --name redis-lesson16 -p 6379:6379 redis:7-alpine

# Запускаем программу
go run main.go

# Проверяем данные в Redis
docker exec -it redis-lesson16 redis-cli

# В redis-cli:
KEYS *
GET user:42
XLEN orders
XRANGE orders - +

📊 Команды Redis в go-redis

ОперацияGoNode.js (ioredis)
Getclient.Get(ctx, key).Result()await redis.get(key)
Set с TTLclient.Set(ctx, key, val, ttl)await redis.set(key, val, 'EX', ttl)
Delclient.Del(ctx, keys...)await redis.del(key)
Existsclient.Exists(ctx, key)await redis.exists(key)
Publishclient.Publish(ctx, ch, msg)await redis.publish(ch, msg)
Subscribeclient.Subscribe(ctx, ch)redis.subscribe(ch)
SetNXclient.SetNX(ctx, key, val, ttl)await redis.set(key, val, 'NX', 'EX', ttl)
Pipelinepipe := client.Pipeline()redis.pipeline()

🚀 Запуск программы

go run main.go

# Сборка
go build -o redis-demo main.go
./redis-demo
⚠️ Типичные ошибки:
💡 Практический совет:

💡 Best practices от сеньоров:

🔑 Ключевые концепции

Что нужно запомнить из этого урока:

💡 Для Node.js разработчика:

← Предыдущий урок Следующий урок →