ioredis / redis (npm) → github.com/redis/go-redis/v9
await redis.get(key) → client.Get(ctx, key).Result()
await redis.set(key, val, 'EX', ttl) → client.Set(ctx, key, val, ttl)
redis.publish(channel, msg) → client.Publish(ctx, channel, msg)
redlock (npm) → своя реализация через SET NX
redis streams → XAdd, XRead, XGroup
mkdir go-redis && cd go-redis
go mod init go-redis
# Устанавливаем go-redis
go get github.com/redis/go-redis/v9
go get github.com/google/uuid
go mod tidy
# Запускаем Redis
docker run -d --name redis-lesson16 \
-p 6379:6379 \
redis:7-alpine
package main
import (
"context"
"encoding/json"
"errors"
"fmt"
"log"
"sync"
"time"
"github.com/google/uuid"
"github.com/redis/go-redis/v9"
)
// ╔══════════════════════════════════════════════════════════╗
// ║ 1. МОДЕЛИ ДАННЫХ ║
// ╚══════════════════════════════════════════════════════════╝
type User struct {
ID string `json:"id"`
Name string `json:"name"`
Email string `json:"email"`
Plan string `json:"plan"`
UpdatedAt time.Time `json:"updated_at"`
}
// ╔══════════════════════════════════════════════════════════╗
// ║ 2. КЭШ-СЕРВИС (Cache-Aside паттерн) ║
// ╚══════════════════════════════════════════════════════════╝
// CacheService — сервис кэширования с Redis
type CacheService struct {
client *redis.Client
}
// NewCacheService — конструктор
func NewCacheService(client *redis.Client) *CacheService {
return &CacheService{client: client}
}
// GetUser — получает пользователя (сначала кэш, потом "БД")
func (s *CacheService) GetUser(ctx context.Context, userID string) (*User, error) {
cacheKey := fmt.Sprintf("user:%s", userID)
// 1. Пытаемся получить из кэша
data, err := s.client.Get(ctx, cacheKey).Bytes()
if err == nil {
// КЭШ ПОПАДАНИЕ (cache hit)
var user User
if err := json.Unmarshal(data, &user); err != nil {
return nil, fmt.Errorf("unmarshal cached user: %w", err)
}
log.Printf(" [CACHE HIT] user:%s", userID)
return &user, nil
}
// redis.Nil — ключ не существует
if !errors.Is(err, redis.Nil) {
// Реальная ошибка Redis
return nil, fmt.Errorf("redis get: %w", err)
}
// 2. КЭШ ПРОМАХ (cache miss) — "идём в БД"
log.Printf(" [CACHE MISS] user:%s — запрашиваем БД...", userID)
user, err := s.fetchUserFromDB(ctx, userID)
if err != nil {
return nil, err
}
// 3. Сохраняем в кэш (в фоне, не блокируем ответ)
go s.cacheUser(context.Background(), user)
return user, nil
}
// cacheUser — сохраняет пользователя в кэш
func (s *CacheService) cacheUser(ctx context.Context, user *User) {
cacheKey := fmt.Sprintf("user:%s", user.ID)
data, _ := json.Marshal(user)
// Set с TTL 5 минут
if err := s.client.Set(ctx, cacheKey, data, 5*time.Minute).Err(); err != nil {
log.Printf(" Ошибка кэширования: %v", err)
}
}
// InvalidateUser — инвалидация кэша (при обновлении пользователя)
func (s *CacheService) InvalidateUser(ctx context.Context, userID string) error {
cacheKey := fmt.Sprintf("user:%s", userID)
return s.client.Del(ctx, cacheKey).Err()
}
// InvalidatePattern — инвалидация по паттерну (осторожно в продакшене!)
func (s *CacheService) InvalidatePattern(ctx context.Context, pattern string) error {
var cursor uint64
for {
keys, nextCursor, err := s.client.Scan(ctx, cursor, pattern, 100).Result()
if err != nil {
return fmt.Errorf("scan: %w", err)
}
if len(keys) > 0 {
if err := s.client.Del(ctx, keys...).Err(); err != nil {
return fmt.Errorf("del: %w", err)
}
}
cursor = nextCursor
if cursor == 0 {
break
}
}
return nil
}
// fetchUserFromDB — имитация запроса к БД
func (s *CacheService) fetchUserFromDB(ctx context.Context, userID string) (*User, error) {
// Имитация задержки БД
time.Sleep(100 * time.Millisecond)
return &User{
ID: userID,
Name: fmt.Sprintf("User-%s", userID),
Email: fmt.Sprintf("user%s@example.com", userID),
Plan: "premium",
UpdatedAt: time.Now(),
}, nil
}
// ╔══════════════════════════════════════════════════════════╗
// ║ 3. PIPELINE (batch-операции) ║
// ╚══════════════════════════════════════════════════════════╝
func (s *CacheService) GetUsersBatch(ctx context.Context, userIDs []string) ([]*User, error) {
// Pipeline — отправляет команды одним сетевым пакетом
pipe := s.client.Pipeline()
// Создаём команды Get для каждого ID
cmds := make([]*redis.StringCmd, len(userIDs))
for i, id := range userIDs {
cmds[i] = pipe.Get(ctx, fmt.Sprintf("user:%s", id))
}
// Выполняем все команды разом
if _, err := pipe.Exec(ctx); err != nil && !errors.Is(err, redis.Nil) {
return nil, fmt.Errorf("pipeline exec: %w", err)
}
// Собираем результаты
var users []*User
for i, cmd := range cmds {
data, err := cmd.Bytes()
if err != nil {
if errors.Is(err, redis.Nil) {
// cache miss — можно сходить в БД
continue
}
return nil, fmt.Errorf("pipeline result: %w", err)
}
var user User
json.Unmarshal(data, &user)
users = append(users, &user)
_ = i
}
return users, nil
}
// ╔══════════════════════════════════════════════════════════╗
// ║ 4. PUB/SUB ║
// ╚══════════════════════════════════════════════════════════╝
// PubSubService — сервис публикации/подписки
type PubSubService struct {
client *redis.Client
}
func NewPubSubService(client *redis.Client) *PubSubService {
return &PubSubService{client: client}
}
// Publish — публикует сообщение в канал
func (ps *PubSubService) Publish(ctx context.Context, channel, message string) error {
return ps.client.Publish(ctx, channel, message).Err()
}
// Subscribe — подписывается на канал и обрабатывает сообщения
func (ps *PubSubService) Subscribe(ctx context.Context, channel string, handler func(msg string)) {
pubsub := ps.client.Subscribe(ctx, channel)
defer pubsub.Close()
// Ждём подтверждения подписки
if _, err := pubsub.Receive(ctx); err != nil {
log.Printf(" Ошибка подписки на %s: %v", channel, err)
return
}
log.Printf(" ✓ Подписаны на канал %q", channel)
// Канал для сообщений
ch := pubsub.Channel()
for {
select {
case msg, ok := <-ch:
if !ok {
return // Канал закрыт
}
handler(msg.Payload)
case <-ctx.Done():
return
}
}
}
// ╔══════════════════════════════════════════════════════════╗
// ║ 5. РАСПРЕДЕЛЁННАЯ БЛОКИРОВКА (Distributed Lock) ║
// ╚══════════════════════════════════════════════════════════╝
// DistributedLock — простая распределённая блокировка на Redis
type DistributedLock struct {
client *redis.Client
key string
value string // Уникальный ID владельца блокировки
ttl time.Duration
}
// NewDistributedLock — конструктор
func NewDistributedLock(client *redis.Client, key string, ttl time.Duration) *DistributedLock {
return &DistributedLock{
client: client,
key: fmt.Sprintf("lock:%s", key),
value: uuid.New().String(), // Уникальный ID для безопасного снятия
ttl: ttl,
}
}
// Acquire — захватывает блокировку
func (l *DistributedLock) Acquire(ctx context.Context) (bool, error) {
// SET key value NX EX ttl
// NX — установить только если ключ НЕ существует
// EX — время жизни в секундах
ok, err := l.client.SetNX(ctx, l.key, l.value, l.ttl).Result()
if err != nil {
return false, fmt.Errorf("acquire lock: %w", err)
}
return ok, nil
}
// Release — освобождает блокировку (только если мы владелец!)
func (l *DistributedLock) Release(ctx context.Context) error {
// Lua-скрипт для атомарной проверки владельца и удаления
script := `
if redis.call("GET", KEYS[1]) == ARGV[1] then
return redis.call("DEL", KEYS[1])
else
return 0
end
`
result, err := l.client.Eval(ctx, script, []string{l.key}, l.value).Result()
if err != nil {
return fmt.Errorf("release lock: %w", err)
}
if result.(int64) == 0 {
return errors.New("lock lost (expired or owned by another)")
}
return nil
}
// Extend — продлевает блокировку
func (l *DistributedLock) Extend(ctx context.Context) error {
script := `
if redis.call("GET", KEYS[1]) == ARGV[1] then
return redis.call("EXPIRE", KEYS[1], ARGV[2])
else
return 0
end
`
result, err := l.client.Eval(ctx, script, []string{l.key}, l.value, int(l.ttl.Seconds())).Result()
if err != nil {
return fmt.Errorf("extend lock: %w", err)
}
if result.(int64) == 0 {
return errors.New("cannot extend: not lock owner")
}
return nil
}
// ╔══════════════════════════════════════════════════════════╗
// ║ 6. REDIS STREAMS (основы) ║
// ╚══════════════════════════════════════════════════════════╝
// StreamService — работа с Redis Streams
type StreamService struct {
client *redis.Client
}
func NewStreamService(client *redis.Client) *StreamService {
return &StreamService{client: client}
}
// AddEvent — добавляет событие в стрим
func (s *StreamService) AddEvent(ctx context.Context, stream string, event map[string]interface{}) (string, error) {
id, err := s.client.XAdd(ctx, &redis.XAddArgs{
Stream: stream,
Values: event,
}).Result()
if err != nil {
return "", fmt.Errorf("xadd: %w", err)
}
return id, nil
}
// ReadEvents — читает события из стрима (неблокирующее)
func (s *StreamService) ReadEvents(ctx context.Context, stream, lastID string, count int64) ([]redis.XMessage, error) {
result, err := s.client.XRead(ctx, &redis.XReadArgs{
Streams: []string{stream, lastID},
Count: count,
Block: 0, // 0 = неблокирующее
}).Result()
if err != nil {
if errors.Is(err, redis.Nil) {
return nil, nil // Нет новых сообщений
}
return nil, fmt.Errorf("xread: %w", err)
}
if len(result) > 0 {
return result[0].Messages, nil
}
return nil, nil
}
// CreateConsumerGroup — создаёт consumer group
func (s *StreamService) CreateConsumerGroup(ctx context.Context, stream, group string) error {
err := s.client.XGroupCreateMkStream(ctx, stream, group, "0").Err()
if err != nil && !strings.Contains(err.Error(), "BUSYGROUP") {
return fmt.Errorf("xgroup create: %w", err)
}
return nil
}
// ╔══════════════════════════════════════════════════════════╗
// ║ MAIN ║
// ╚══════════════════════════════════════════════════════════╝
func main() {
log.SetFlags(log.LstdFlags | log.Lmicroseconds)
log.Println("🔴 Redis — кэширование, Pub/Sub, блокировки, стримы")
ctx := context.Background()
// Подключение к Redis
client := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
Password: "",
DB: 0,
PoolSize: 20,
MinIdleConns: 5,
MaxRetries: 3,
DialTimeout: 5 * time.Second,
ReadTimeout: 3 * time.Second,
WriteTimeout: 3 * time.Second,
})
// Проверка соединения
if err := client.Ping(ctx).Err(); err != nil {
log.Fatalf("Redis подключение: %v", err)
}
log.Println("✅ Подключено к Redis")
// Очищаем тестовые ключи
defer client.FlushDB(ctx)
// ==========================================
// 1. КЭШИРОВАНИЕ (Cache-Aside)
// ==========================================
log.Println("\n── 1. КЭШИРОВАНИЕ ──")
cache := NewCacheService(client)
// Первый вызов — cache miss
user1, _ := cache.GetUser(ctx, "42")
log.Printf(" Пользователь: %s <%s>", user1.Name, user1.Email)
// Второй вызов — cache hit
user2, _ := cache.GetUser(ctx, "42")
log.Printf(" Пользователь (из кэша): %s", user2.Name)
// Инвалидация
cache.InvalidateUser(ctx, "42")
log.Println(" ✓ Кэш инвалидирован")
// Третий вызов — снова cache miss
cache.GetUser(ctx, "42")
// ==========================================
// 2. PIPELINE
// ==========================================
log.Println("\n── 2. PIPELINE ──")
// Заранее кэшируем несколько пользователей
for _, id := range []string{"1", "2", "3"} {
cache.GetUser(ctx, id) // Прогреваем кэш
}
users, err := cache.GetUsersBatch(ctx, []string{"1", "2", "3", "99"})
if err != nil {
log.Printf(" Ошибка pipeline: %v", err)
} else {
log.Printf(" Получено пользователей: %d", len(users))
}
// ==========================================
// 3. PUB/SUB
// ==========================================
log.Println("\n── 3. PUB/SUB ──")
pubsub := NewPubSubService(client)
// Создаём контекст с отменой для подписчика
subCtx, subCancel := context.WithCancel(ctx)
// Запускаем подписчика в горутине
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
pubsub.Subscribe(subCtx, "notifications", func(msg string) {
log.Printf(" 📩 Получено сообщение: %s", msg)
})
}()
// Даём время на подписку
time.Sleep(100 * time.Millisecond)
// Публикуем несколько сообщений
for i := 1; i <= 3; i++ {
msg := fmt.Sprintf("Уведомление #%d: %s", i, time.Now().Format("15:04:05"))
pubsub.Publish(ctx, "notifications", msg)
time.Sleep(200 * time.Millisecond)
}
// Отменяем подписку
subCancel()
wg.Wait()
// ==========================================
// 4. РАСПРЕДЕЛЁННАЯ БЛОКИРОВКА
// ==========================================
log.Println("\n── 4. РАСПРЕДЕЛЁННАЯ БЛОКИРОВКА ──")
lock := NewDistributedLock(client, "resource:order:42", 10*time.Second)
// Захватываем блокировку
acquired, err := lock.Acquire(ctx)
if err != nil {
log.Printf(" Ошибка захвата: %v", err)
} else if acquired {
log.Println(" ✓ Блокировка захвачена")
// Имитация работы под блокировкой
time.Sleep(500 * time.Millisecond)
// Продлеваем блокировку
if err := lock.Extend(ctx); err != nil {
log.Printf(" Ошибка продления: %v", err)
} else {
log.Println(" ✓ Блокировка продлена")
}
// Освобождаем
if err := lock.Release(ctx); err != nil {
log.Printf(" Ошибка освобождения: %v", err)
} else {
log.Println(" ✓ Блокировка освобождена")
}
} else {
log.Println(" ❌ Блокировка занята другим процессом")
}
// Попытка повторного захвата (должна быть успешной)
acquired2, _ := lock.Acquire(ctx)
log.Printf(" Повторный захват: %v (ожидается true)", acquired2)
lock.Release(ctx)
// ==========================================
// 5. REDIS STREAMS
// ==========================================
log.Println("\n── 5. REDIS STREAMS ──")
streams := NewStreamService(client)
// Добавляем события
for i := 1; i <= 5; i++ {
id, _ := streams.AddEvent(ctx, "orders", map[string]interface{}{
"order_id": fmt.Sprintf("order-%d", i),
"amount": i * 100,
"status": "created",
})
log.Printf(" ✓ Событие добавлено: %s", id)
}
// Читаем все события с начала
messages, err := streams.ReadEvents(ctx, "orders", "0", 10)
if err != nil {
log.Printf(" Ошибка чтения: %v", err)
} else {
log.Printf(" Прочитано событий: %d", len(messages))
for _, msg := range messages {
log.Printf(" %s: %v", msg.ID, msg.Values)
}
}
// Читаем только новые (с последнего ID)
if len(messages) > 0 {
lastID := messages[len(messages)-1].ID
newMessages, _ := streams.ReadEvents(ctx, "orders", lastID, 10)
log.Printf(" Новых событий после %s: %d", lastID, len(newMessages))
}
log.Println("\n✅ Демонстрация завершена!")
}
// Добавить в импорты
import "strings"
⚠️ Важно: Добавьте "strings" в блок импортов в начале файла.
# Запускаем Redis
docker run -d --name redis-lesson16 -p 6379:6379 redis:7-alpine
# Запускаем программу
go run main.go
# Проверяем данные в Redis
docker exec -it redis-lesson16 redis-cli
# В redis-cli:
KEYS *
GET user:42
XLEN orders
XRANGE orders - +
| Операция | Go | Node.js (ioredis) |
|---|---|---|
| Get | client.Get(ctx, key).Result() | await redis.get(key) |
| Set с TTL | client.Set(ctx, key, val, ttl) | await redis.set(key, val, 'EX', ttl) |
| Del | client.Del(ctx, keys...) | await redis.del(key) |
| Exists | client.Exists(ctx, key) | await redis.exists(key) |
| Publish | client.Publish(ctx, ch, msg) | await redis.publish(ch, msg) |
| Subscribe | client.Subscribe(ctx, ch) | redis.subscribe(ch) |
| SetNX | client.SetNX(ctx, key, val, ttl) | await redis.set(key, val, 'NX', 'EX', ttl) |
| Pipeline | pipe := client.Pipeline() | redis.pipeline() |
go run main.go
# Сборка
go build -o redis-demo main.go
./redis-demo
redis.Nil — ключ не найден (норма). Другие ошибки — проблемы с соединением.
💡 Best practices от сеньоров:
💡 Для Node.js разработчика:
go-redis — прямой аналог ioredis. API очень похоже.
await — возврат .Result() или .Err().