Урок 31: Идемпотентность, дедупликация и идемпотентные ключи

Урок 31. Идемпотентность, дедупликация и идемпотентные ключи

🔄 Node.js → Go (ключевые аналоги):

📋 Что изучаем

📦 Инициализация проекта

mkdir go-idempotency && cd go-idempotency
go mod init go-idempotency

# Устанавливаем зависимости
go get github.com/redis/go-redis/v9
go get github.com/jackc/pgx/v5
go get github.com/jackc/pgx/v5/pgxpool
go get github.com/google/uuid
go get github.com/stretchr/testify
go mod tidy

# Запускаем Redis и PostgreSQL (для демонстрации)
docker run -d --name redis-idempotency -p 6379:6379 redis:7-alpine
docker run -d --name pg-idempotency \
  -e POSTGRES_PASSWORD=secret -e POSTGRES_DB=idemdb \
  -p 5432:5432 postgres:16-alpine

# Структура
mkdir -p internal/idempotency
mkdir -p internal/middleware
mkdir -p cmd/demo

💻 Файл: internal/idempotency/redis.go

package idempotency

import (
    "context"
    "encoding/json"
    "fmt"
    "time"

    "github.com/redis/go-redis/v9"
)

// ╔══════════════════════════════════════════════════════════╗
// ║  ИДЕМПОТЕНТНОСТЬ ЧЕРЕЗ REDIS                           ║
// ╚══════════════════════════════════════════════════════════╝

// RedisStore — хранилище идемпотентных ключей в Redis
type RedisStore struct {
    client *redis.Client
    ttl    time.Duration // Сколько хранить ключ
}

// NewRedisStore — создаёт хранилище
func NewRedisStore(client *redis.Client, ttl time.Duration) *RedisStore {
    return &RedisStore{client: client, ttl: ttl}
}

// ProcessedResult — результат обработки (сохраняется в Redis)
type ProcessedResult struct {
    Status    string    `json:"status"`    // "processing", "completed", "failed"
    Response  string    `json:"response"`  // JSON-ответ
    CreatedAt time.Time `json:"created_at"`
}

// IsDuplicate — проверяет, был ли ключ уже обработан
// Возвращает: isDuplicate, cachedResponse, error
func (s *RedisStore) IsDuplicate(
    ctx context.Context,
    idempotencyKey string,
) (bool, *ProcessedResult, error) {
    data, err := s.client.Get(ctx, s.redisKey(idempotencyKey)).Bytes()
    if err != nil {
        if err == redis.Nil {
            return false, nil, nil // Ключ не найден — не дубликат
        }
        return false, nil, fmt.Errorf("redis get: %w", err)
    }

    var result ProcessedResult
    if err := json.Unmarshal(data, &result); err != nil {
        return false, nil, fmt.Errorf("unmarshal result: %w", err)
    }

    return true, &result, nil
}

// MarkProcessing — помечает ключ как "в обработке"
func (s *RedisStore) MarkProcessing(ctx context.Context, idempotencyKey string) error {
    result := ProcessedResult{
        Status:    "processing",
        CreatedAt: time.Now(),
    }
    return s.save(ctx, idempotencyKey, result)
}

// MarkCompleted — помечает ключ как "обработан" с ответом
func (s *RedisStore) MarkCompleted(ctx context.Context, idempotencyKey string, response string) error {
    result := ProcessedResult{
        Status:    "completed",
        Response:  response,
        CreatedAt: time.Now(),
    }
    return s.save(ctx, idempotencyKey, result)
}

// MarkFailed — помечает ключ как "ошибка"
func (s *RedisStore) MarkFailed(ctx context.Context, idempotencyKey string, errMsg string) error {
    result := ProcessedResult{
        Status:    "failed",
        Response:  errMsg,
        CreatedAt: time.Now(),
    }
    return s.save(ctx, idempotencyKey, result)
}

// Delete — удаляет ключ (для тестов)
func (s *RedisStore) Delete(ctx context.Context, idempotencyKey string) error {
    return s.client.Del(ctx, s.redisKey(idempotencyKey)).Err()
}

// save — сохраняет результат в Redis
func (s *RedisStore) save(ctx context.Context, key string, result ProcessedResult) error {
    data, err := json.Marshal(result)
    if err != nil {
        return fmt.Errorf("marshal result: %w", err)
    }

    // SetNX гарантирует, что мы не перезапишем существующий ключ
    ok, err := s.client.SetNX(ctx, s.redisKey(key), data, s.ttl).Result()
    if err != nil {
        return fmt.Errorf("redis setnx: %w", err)
    }
    if !ok {
        // Ключ уже существует (другой процесс уже обрабатывает)
        // Это нормально — возвращаем ошибку, вызывающий код проверит IsDuplicate
        return fmt.Errorf("idempotency key already exists")
    }
    return nil
}

func (s *RedisStore) redisKey(key string) string {
    return fmt.Sprintf("idempotency:%s", key)
}

💻 Файл: internal/idempotency/postgres.go

package idempotency

import (
    "context"
    "fmt"
    "time"

    "github.com/jackc/pgx/v5"
    "github.com/jackc/pgx/v5/pgxpool"
)

// ╔══════════════════════════════════════════════════════════╗
// ║  ИДЕМПОТЕНТНОСТЬ ЧЕРЕЗ POSTGRESQL                     ║
// ╚══════════════════════════════════════════════════════════╝

// PGStore — хранилище идемпотентных ключей в PostgreSQL
type PGStore struct {
    pool *pgxpool.Pool
}

// NewPGStore — создаёт хранилище
func NewPGStore(pool *pgxpool.Pool) *PGStore {
    return &PGStore{pool: pool}
}

// InitSchema — создаёт таблицу для идемпотентных ключей
func (s *PGStore) InitSchema(ctx context.Context) error {
    query := `
        CREATE TABLE IF NOT EXISTS idempotency_keys (
            key         TEXT PRIMARY KEY,
            status      TEXT NOT NULL DEFAULT 'processing',
            response    TEXT,
            created_at  TIMESTAMPTZ NOT NULL DEFAULT NOW(),
            updated_at  TIMESTAMPTZ NOT NULL DEFAULT NOW()
        );

        -- TTL: удаляем старые ключи (храним 24 часа)
        CREATE INDEX IF NOT EXISTS idx_idempotency_created
            ON idempotency_keys(created_at)
            WHERE status IN ('completed', 'failed');
    `
    _, err := s.pool.Exec(ctx, query)
    return err
}

// IsDuplicate — проверяет, был ли ключ обработан
func (s *PGStore) IsDuplicate(ctx context.Context, key string) (bool, *ProcessedResult, error) {
    var result ProcessedResult
    err := s.pool.QueryRow(ctx,
        `SELECT status, COALESCE(response, ''), created_at
         FROM idempotency_keys WHERE key = $1`,
        key,
    ).Scan(&result.Status, &result.Response, &result.CreatedAt)

    if err != nil {
        if err == pgx.ErrNoRows {
            return false, nil, nil
        }
        return false, nil, fmt.Errorf("pg select: %w", err)
    }

    return true, &result, nil
}

// InsertKey — вставляет новый ключ (ON CONFLICT DO NOTHING)
func (s *PGStore) InsertKey(ctx context.Context, key string) (bool, error) {
    // ON CONFLICT DO NOTHING — если ключ уже есть, ничего не делаем
    result, err := s.pool.Exec(ctx,
        `INSERT INTO idempotency_keys (key, status)
         VALUES ($1, 'processing')
         ON CONFLICT (key) DO NOTHING`,
        key,
    )
    if err != nil {
        return false, fmt.Errorf("insert key: %w", err)
    }

    // Если RowsAffected == 0 — ключ уже существует
    return result.RowsAffected() > 0, nil
}

// MarkCompleted — помечает ключ как обработанный
func (s *PGStore) MarkCompleted(ctx context.Context, key, response string) error {
    _, err := s.pool.Exec(ctx,
        `UPDATE idempotency_keys
         SET status = 'completed', response = $2, updated_at = NOW()
         WHERE key = $1`,
        key, response,
    )
    return err
}

// CleanupOldKeys — удаляет старые ключи (можно запускать по таймеру)
func (s *PGStore) CleanupOldKeys(ctx context.Context, olderThan time.Duration) error {
    _, err := s.pool.Exec(ctx,
        `DELETE FROM idempotency_keys
         WHERE created_at < $1 AND status IN ('completed', 'failed')`,
        time.Now().Add(-olderThan),
    )
    return err
}

💻 Файл: internal/idempotency/service.go

package idempotency

import (
    "context"
    "errors"
    "fmt"
    "log"
    "time"
)

// ╔══════════════════════════════════════════════════════════╗
// ║  СЕРВИС С ИДЕМПОТЕНТНОСТЬЮ                             ║
// ╚══════════════════════════════════════════════════════════╝

// PaymentRequest — пример запроса
type PaymentRequest struct {
    IdempotencyKey string  `json:"idempotency_key"`
    OrderID        string  `json:"order_id"`
    Amount         float64 `json:"amount"`
    Currency       string  `json:"currency"`
}

// PaymentResponse — пример ответа
type PaymentResponse struct {
    PaymentID string `json:"payment_id"`
    Status    string `json:"status"`
    Message   string `json:"message"`
}

// PaymentService — сервис платежей с идемпотентностью
type PaymentService struct {
    store *RedisStore
}

// NewPaymentService — конструктор
func NewPaymentService(store *RedisStore) *PaymentService {
    return &PaymentService{store: store}
}

// ProcessPayment — обрабатывает платёж идемпотентно
func (s *PaymentService) ProcessPayment(
    ctx context.Context,
    req PaymentRequest,
) (*PaymentResponse, error) {
    // 1. Проверяем, не обработан ли уже этот ключ
    isDup, cached, err := s.store.IsDuplicate(ctx, req.IdempotencyKey)
    if err != nil {
        return nil, fmt.Errorf("check duplicate: %w", err)
    }

    if isDup {
        log.Printf("🔄 Дубликат запроса: key=%s status=%s", req.IdempotencyKey, cached.Status)

        switch cached.Status {
        case "completed":
            // Возвращаем закэшированный ответ
            // (в реальности — десериализовать из cached.Response)
            return &PaymentResponse{
                PaymentID: "cached-" + req.IdempotencyKey[:8],
                Status:    "success",
                Message:   "Payment already processed (cached)",
            }, nil
        case "processing":
            // Другой процесс обрабатывает — возвращаем ошибку
            return nil, errors.New("payment is being processed, please retry")
        case "failed":
            // Предыдущая попытка провалилась — пробуем снова
            log.Printf("🔄 Повтор после неудачи: key=%s", req.IdempotencyKey)
        }
    }

    // 2. Помечаем "в обработке"
    if err := s.store.MarkProcessing(ctx, req.IdempotencyKey); err != nil {
        // Возможно, другой процесс уже начал обработку
        return nil, fmt.Errorf("mark processing: %w", err)
    }

    // 3. Выполняем бизнес-логику (собственно платёж)
    paymentID, err := s.executePayment(ctx, req)
    if err != nil {
        s.store.MarkFailed(ctx, req.IdempotencyKey, err.Error())
        return nil, fmt.Errorf("execute payment: %w", err)
    }

    // 4. Сохраняем успешный результат
    response := fmt.Sprintf(`{"payment_id":"%s","status":"success"}`, paymentID)
    if err := s.store.MarkCompleted(ctx, req.IdempotencyKey, response); err != nil {
        log.Printf("⚠️ Не удалось сохранить результат: %v", err)
        // Платёж уже проведён — логируем, но не возвращаем ошибку
    }

    return &PaymentResponse{
        PaymentID: paymentID,
        Status:    "success",
        Message:   "Payment processed successfully",
    }, nil
}

// executePayment — имитация выполнения платежа
func (s *PaymentService) executePayment(ctx context.Context, req PaymentRequest) (string, error) {
    // Имитация задержки
    time.Sleep(100 * time.Millisecond)

    // Генерация ID платежа
    paymentID := fmt.Sprintf("pay_%s", req.IdempotencyKey[:12])

    log.Printf("💰 Платёж выполнен: order=%s amount=%.2f payment=%s",
        req.OrderID, req.Amount, paymentID)

    return paymentID, nil
}

💻 Файл: internal/middleware/idempotency.go

package middleware

import (
    "encoding/json"
    "net/http"
)

// IdempotencyMiddleware — HTTP middleware для идемпотентности
// Ожидает заголовок Idempotency-Key в запросе
func IdempotencyMiddleware(next http.Handler) http.Handler {
    return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
        // Применяем ТОЛЬКО для мутирующих методов
        if r.Method == http.MethodGet || r.Method == http.MethodHead || r.Method == http.MethodOptions {
            next.ServeHTTP(w, r)
            return
        }

        // Проверяем наличие ключа идемпотентности
        key := r.Header.Get("Idempotency-Key")
        if key == "" {
            // Для строгого API — ошибка
            // Для мягкого — генерируем ключ сами
            w.Header().Set("Content-Type", "application/json")
            w.WriteHeader(http.StatusBadRequest)
            json.NewEncoder(w).Encode(map[string]string{
                "error": "Idempotency-Key header is required for mutating requests",
            })
            return
        }

        // Проверяем длину ключа
        if len(key) > 255 {
            w.Header().Set("Content-Type", "application/json")
            w.WriteHeader(http.StatusBadRequest)
            json.NewEncoder(w).Encode(map[string]string{
                "error": "Idempotency-Key must be <= 255 characters",
            })
            return
        }

        // Ключ в порядке — передаём дальше
        next.ServeHTTP(w, r)
    })
}

💻 Файл: cmd/demo/main.go

package main

import (
    "context"
    "fmt"
    "log"
    "time"

    "github.com/redis/go-redis/v9"
    "github.com/google/uuid"
    "go-idempotency/internal/idempotency"
)

func main() {
    log.SetFlags(log.LstdFlags | log.Lmicroseconds)
    fmt.Println("╔══════════════════════════════════════════╗")
    fmt.Println("║   ИДЕМПОТЕНТНОСТЬ И ДЕДУПЛИКАЦИЯ       ║")
    fmt.Println("╚══════════════════════════════════════════╝")

    // Подключаемся к Redis
    client := redis.NewClient(&redis.Options{Addr: "localhost:6379"})
    defer client.Close()

    if err := client.Ping(context.Background()).Err(); err != nil {
        log.Fatalf("Redis: %v", err)
    }
    log.Println("✅ Подключено к Redis")

    // Создаём сервис
    store := idempotency.NewRedisStore(client, 1*time.Hour)
    svc := idempotency.NewPaymentService(store)
    ctx := context.Background()

    // ==========================================
    // 1. ПЕРВЫЙ ЗАПРОС — УСПЕХ
    // ==========================================
    fmt.Println("\n── 1. ПЕРВЫЙ ЗАПРОС ──")
    key := uuid.New().String()
    log.Printf("Ключ: %s", key)

    resp, err := svc.ProcessPayment(ctx, idempotency.PaymentRequest{
        IdempotencyKey: key,
        OrderID:        "order-123",
        Amount:         99.99,
        Currency:       "USD",
    })
    if err != nil {
        log.Printf("❌ Ошибка: %v", err)
    } else {
        log.Printf("%s: %s", resp.PaymentID, resp.Message)
    }

    // ==========================================
    // 2. ПОВТОРНЫЙ ЗАПРОС — ДУБЛИКАТ
    // ==========================================
    fmt.Println("\n── 2. ПОВТОРНЫЙ ЗАПРОС (ДУБЛИКАТ) ──")

    resp2, err2 := svc.ProcessPayment(ctx, idempotency.PaymentRequest{
        IdempotencyKey: key, // ТОТ ЖЕ КЛЮЧ
        OrderID:        "order-123",
        Amount:         99.99,
        Currency:       "USD",
    })
    if err2 != nil {
        log.Printf("❌ Ошибка: %v", err2)
    } else {
        log.Printf("%s: %s", resp2.PaymentID, resp2.Message)
    }

    // ==========================================
    // 3. НОВЫЙ КЛЮЧ — НОВЫЙ ПЛАТЁЖ
    // ==========================================
    fmt.Println("\n── 3. НОВЫЙ КЛЮЧ — НОВЫЙ ПЛАТЁЖ ──")
    newKey := uuid.New().String()
    log.Printf("Новый ключ: %s", newKey)

    resp3, err3 := svc.ProcessPayment(ctx, idempotency.PaymentRequest{
        IdempotencyKey: newKey,
        OrderID:        "order-456",
        Amount:         149.99,
        Currency:       "USD",
    })
    if err3 != nil {
        log.Printf("❌ Ошибка: %v", err3)
    } else {
        log.Printf("%s: %s", resp3.PaymentID, resp3.Message)
    }

    // ==========================================
    // 4. ДЕМОНСТРАЦИЯ КОНФЛИКТА (processing)
    // ==========================================
    fmt.Println("\n── 4. КОНФЛИКТ (два одновременных запроса) ──")
    conflictKey := uuid.New().String()

    // Имитация: вручную помечаем "в обработке"
    store.MarkProcessing(ctx, conflictKey)

    _, err4 := svc.ProcessPayment(ctx, idempotency.PaymentRequest{
        IdempotencyKey: conflictKey,
        OrderID:        "order-789",
        Amount:         200.00,
        Currency:       "USD",
    })
    if err4 != nil {
        log.Printf("❌ Ожидаемая ошибка: %v", err4)
    }

    log.Println("\n✅ Демонстрация завершена!")
}

🚀 Запуск

# Запускаем Redis
docker run -d --name redis-idempotency -p 6379:6379 redis:7-alpine

# Запускаем демонстрацию
go run ./cmd/demo/main.go

📊 Стратегии идемпотентности

СтратегияХранилищеПлюсыМинусы
Redis SetNXRedisБыстро, атомарно, TTLПотеря данных при сбое Redis
UNIQUE + ON CONFLICTPostgreSQLНадёжно, транзакционноМедленнее Redis
Kafka message_idKafka + БДСквозная дедупликацияСложнее в реализации
Клиентский ключЗаголовок HTTPПросто для APIКлиент может переиспользовать
⚠️ Типичные ошибки:
💡 Практический совет:

💡 Best practices от сеньоров:

🔑 Ключевые концепции

Что нужно запомнить из этого урока:

💡 Для Node.js разработчика:

← Предыдущий урок Следующий урок →