Урок 14: Миграции, транзакции и продвинутые SQL-паттерны

Урок 14. Миграции, транзакции и продвинутые SQL-паттерны

🔄 Node.js → Go (ключевые аналоги):

📋 Что изучаем

📦 Инициализация проекта

mkdir go-migrations && cd go-migrations
go mod init go-migrations

# Устанавливаем зависимости
go get github.com/jackc/pgx/v5
go get github.com/jackc/pgx/v5/pgxpool
go get github.com/golang-migrate/migrate/v4
go get github.com/golang-migrate/migrate/v4/database/postgres
go get github.com/golang-migrate/migrate/v4/source/file
go get github.com/google/uuid
go mod tidy

# Запускаем PostgreSQL
docker run -d --name pg-lesson14 \
  -e POSTGRES_PASSWORD=secret \
  -e POSTGRES_DB=bankdb \
  -p 5432:5432 \
  postgres:16-alpine

# Создаём директорию для миграций
mkdir -p migrations

🗄️ Файлы миграций

Файл: migrations/000001_create_accounts.up.sql

-- UP миграция: создаём таблицу счетов
CREATE TABLE IF NOT EXISTS accounts (
    id          UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    owner       VARCHAR(255) NOT NULL,
    balance     BIGINT NOT NULL DEFAULT 0
        CHECK (balance >= 0),  -- Баланс не может быть отрицательным
    version     INT NOT NULL DEFAULT 1,  -- Для оптимистичной блокировки
    created_at  TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    updated_at  TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

CREATE INDEX idx_accounts_owner ON accounts(owner);

-- Создаём таблицу транзакций (история переводов)
CREATE TABLE IF NOT EXISTS transfers (
    id              UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    from_account_id UUID NOT NULL REFERENCES accounts(id),
    to_account_id   UUID NOT NULL REFERENCES accounts(id),
    amount          BIGINT NOT NULL CHECK (amount > 0),
    description     TEXT DEFAULT '',
    created_at      TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    -- Проверка: нельзя перевести самому себе
    CONSTRAINT different_accounts CHECK (from_account_id != to_account_id)
);

CREATE INDEX idx_transfers_from ON transfers(from_account_id);
CREATE INDEX idx_transfers_to ON transfers(to_account_id);
CREATE INDEX idx_transfers_created ON transfers(created_at DESC);

-- Триггер для updated_at
CREATE OR REPLACE FUNCTION update_updated_at_column()
RETURNS TRIGGER AS $
BEGIN
    NEW.updated_at = NOW();
    RETURN NEW;
END;
$ language 'plpgsql';

CREATE TRIGGER update_accounts_updated_at
    BEFORE UPDATE ON accounts
    FOR EACH ROW
    EXECUTE FUNCTION update_updated_at_column();

Файл: migrations/000001_create_accounts.down.sql

-- DOWN миграция: откатываем
DROP TABLE IF EXISTS transfers;
DROP TABLE IF EXISTS accounts;
DROP FUNCTION IF EXISTS update_updated_at_column();

Файл: migrations/000002_add_account_type.up.sql

-- Добавляем тип счёта
ALTER TABLE accounts
ADD COLUMN IF NOT EXISTS account_type VARCHAR(50) NOT NULL DEFAULT 'checking'
    CHECK (account_type IN ('checking', 'savings', 'credit'));

CREATE INDEX idx_accounts_type ON accounts(account_type);

Файл: migrations/000002_add_account_type.down.sql

ALTER TABLE accounts DROP COLUMN IF EXISTS account_type;

💻 Код программы

package main

import (
    "context"
    "errors"
    "fmt"
    "log"
    "os"
    "time"

    "github.com/google/uuid"
    "github.com/jackc/pgx/v5"
    "github.com/jackc/pgx/v5/pgxpool"
    "github.com/golang-migrate/migrate/v4"
    _ "github.com/golang-migrate/migrate/v4/database/postgres"
    _ "github.com/golang-migrate/migrate/v4/source/file"
)

// ╔══════════════════════════════════════════════════════════╗
// ║  1. МОДЕЛИ ДАННЫХ                                      ║
// ╚══════════════════════════════════════════════════════════╝

type Account struct {
    ID          uuid.UUID `json:"id"`
    Owner       string    `json:"owner"`
    Balance     int64     `json:"balance"`      // В центах/копейках!
    Version     int       `json:"version"`       // Для оптимистичной блокировки
    AccountType string    `json:"account_type"`
    CreatedAt   time.Time `json:"created_at"`
    UpdatedAt   time.Time `json:"updated_at"`
}

type Transfer struct {
    ID            uuid.UUID `json:"id"`
    FromAccountID uuid.UUID `json:"from_account_id"`
    ToAccountID   uuid.UUID `json:"to_account_id"`
    Amount        int64     `json:"amount"`      // В центах/копейках!
    Description   string    `json:"description"`
    CreatedAt     time.Time `json:"created_at"`
}

// ╔══════════════════════════════════════════════════════════╗
// ║  2. ЗАПУСК МИГРАЦИЙ                                    ║
// ╚══════════════════════════════════════════════════════════╝

func runMigrations(dbURL string) error {
    log.Println("📦 Запуск миграций...")

    // Создаём экземпляр migrate
    // file://migrations — путь к директории с файлами миграций
    m, err := migrate.New(
        "file://migrations",
        dbURL,
    )
    if err != nil {
        return fmt.Errorf("создание migrate: %w", err)
    }
    defer m.Close()

    // Применяем все неприменённые миграции
    if err := m.Up(); err != nil {
        if errors.Is(err, migrate.ErrNoChange) {
            log.Println("  ✓ Миграции не требуются (уже актуально)")
            return nil
        }
        return fmt.Errorf("применение миграций: %w", err)
    }

    log.Println("  ✓ Миграции применены успешно")
    return nil
}

// ╔══════════════════════════════════════════════════════════╗
// ║  3. РЕПОЗИТОРИЙ С ТРАНЗАКЦИЯМИ                         ║
// ╚══════════════════════════════════════════════════════════╝

type AccountRepository struct {
    pool *pgxpool.Pool
}

func NewAccountRepository(pool *pgxpool.Pool) *AccountRepository {
    return &AccountRepository{pool: pool}
}

// CreateAccount — создаёт счёт
func (r *AccountRepository) CreateAccount(ctx context.Context, owner, accountType string, initialBalance int64) (*Account, error) {
    query := `
        INSERT INTO accounts (owner, account_type, balance)
        VALUES ($1, $2, $3)
        RETURNING id, owner, balance, version, account_type, created_at, updated_at
    `
    acc := &Account{}
    err := r.pool.QueryRow(ctx, query, owner, accountType, initialBalance).Scan(
        &acc.ID, &acc.Owner, &acc.Balance, &acc.Version,
        &acc.AccountType, &acc.CreatedAt, &acc.UpdatedAt,
    )
    if err != nil {
        return nil, fmt.Errorf("create account: %w", err)
    }
    return acc, nil
}

// GetAccount — получает счёт по ID
func (r *AccountRepository) GetAccount(ctx context.Context, id uuid.UUID) (*Account, error) {
    query := `SELECT id, owner, balance, version, account_type, created_at, updated_at
              FROM accounts WHERE id = $1`
    acc := &Account{}
    err := r.pool.QueryRow(ctx, query, id).Scan(
        &acc.ID, &acc.Owner, &acc.Balance, &acc.Version,
        &acc.AccountType, &acc.CreatedAt, &acc.UpdatedAt,
    )
    if err != nil {
        if errors.Is(err, pgx.ErrNoRows) {
            return nil, fmt.Errorf("account %s: %w", id, ErrNotFound)
        }
        return nil, fmt.Errorf("get account: %w", err)
    }
    return acc, nil
}

// ╔══════════════════════════════════════════════════════════╗
// ║  4. ТРАНЗАКЦИИ: ПЕРЕВОД ДЕНЕГ                          ║
// ╚══════════════════════════════════════════════════════════╝

var (
    ErrNotFound         = errors.New("not found")
    ErrInsufficientFunds = errors.New("insufficient funds")
    ErrVersionConflict   = errors.New("version conflict — retry")
)

// TransferMoney — перевод денег с одного счёта на другой.
// Использует транзакцию с пессимистичной блокировкой.
func (r *AccountRepository) TransferMoney(
    ctx context.Context,
    fromID, toID uuid.UUID,
    amount int64,
    description string,
) (*Transfer, error) {
    // Начинаем транзакцию
    // pgx.BeginTxOptions позволяет задать уровень изоляции
    tx, err := r.pool.BeginTx(ctx, pgx.TxOptions{
        IsoLevel: pgx.ReadCommitted, // Уровень изоляции
        AccessMode: pgx.ReadWrite,
    })
    if err != nil {
        return nil, fmt.Errorf("begin tx: %w", err)
    }
    // defer tx.Rollback(ctx) — безопасно, если транзакция уже закоммичена
    defer tx.Rollback(ctx)

    // ==========================================
    // Шаг 1: Блокируем счёт отправителя (FOR UPDATE)
    // ==========================================
    var fromBalance int64
    var fromVersion int
    err = tx.QueryRow(ctx,
        `SELECT balance, version FROM accounts
         WHERE id = $1
         FOR UPDATE`, // Пессимистичная блокировка
        fromID,
    ).Scan(&fromBalance, &fromVersion)
    if err != nil {
        if errors.Is(err, pgx.ErrNoRows) {
            return nil, fmt.Errorf("from account: %w", ErrNotFound)
        }
        return nil, fmt.Errorf("lock from account: %w", err)
    }

    // Проверяем баланс
    if fromBalance < amount {
        return nil, ErrInsufficientFunds
    }

    // ==========================================
    // Шаг 2: Блокируем счёт получателя
    // ==========================================
    var toBalance int64
    err = tx.QueryRow(ctx,
        `SELECT balance FROM accounts
         WHERE id = $1
         FOR UPDATE`,
        toID,
    ).Scan(&toBalance)
    if err != nil {
        if errors.Is(err, pgx.ErrNoRows) {
            return nil, fmt.Errorf("to account: %w", ErrNotFound)
        }
        return nil, fmt.Errorf("lock to account: %w", err)
    }

    // ==========================================
    // Шаг 3: Списываем с отправителя
    // ==========================================
    _, err = tx.Exec(ctx,
        `UPDATE accounts
         SET balance = balance - $1, version = version + 1
         WHERE id = $2`,
        amount, fromID,
    )
    if err != nil {
        return nil, fmt.Errorf("debit from account: %w", err)
    }

    // ==========================================
    // Шаг 4: Зачисляем получателю
    // ==========================================
    _, err = tx.Exec(ctx,
        `UPDATE accounts
         SET balance = balance + $1, version = version + 1
         WHERE id = $2`,
        amount, toID,
    )
    if err != nil {
        return nil, fmt.Errorf("credit to account: %w", err)
    }

    // ==========================================
    // Шаг 5: Записываем в историю
    // ==========================================
    transfer := &Transfer{
        FromAccountID: fromID,
        ToAccountID:   toID,
        Amount:        amount,
        Description:   description,
    }
    err = tx.QueryRow(ctx,
        `INSERT INTO transfers (from_account_id, to_account_id, amount, description)
         VALUES ($1, $2, $3, $4)
         RETURNING id, created_at`,
        transfer.FromAccountID, transfer.ToAccountID,
        transfer.Amount, transfer.Description,
    ).Scan(&transfer.ID, &transfer.CreatedAt)
    if err != nil {
        return nil, fmt.Errorf("create transfer record: %w", err)
    }

    // ==========================================
    // Шаг 6: Коммитим транзакцию
    // ==========================================
    if err := tx.Commit(ctx); err != nil {
        return nil, fmt.Errorf("commit tx: %w", err)
    }

    return transfer, nil
}

// ╔══════════════════════════════════════════════════════════╗
// ║  5. ОПТИМИСТИЧНАЯ БЛОКИРОВКА                           ║
// ╚══════════════════════════════════════════════════════════╝

// UpdateBalanceOptimistic — обновление через версию (оптимистичная блокировка)
func (r *AccountRepository) UpdateBalanceOptimistic(
    ctx context.Context,
    id uuid.UUID,
    expectedVersion int,
    newBalance int64,
) (*Account, error) {
    query := `
        UPDATE accounts
        SET balance = $1, version = version + 1
        WHERE id = $2 AND version = $3
        RETURNING id, owner, balance, version, account_type, created_at, updated_at
    `
    acc := &Account{}
    err := r.pool.QueryRow(ctx, query, newBalance, id, expectedVersion).Scan(
        &acc.ID, &acc.Owner, &acc.Balance, &acc.Version,
        &acc.AccountType, &acc.CreatedAt, &acc.UpdatedAt,
    )
    if err != nil {
        if errors.Is(err, pgx.ErrNoRows) {
            // Если 0 строк обновлено — версия изменилась (конфликт)
            return nil, ErrVersionConflict
        }
        return nil, fmt.Errorf("optimistic update: %w", err)
    }
    return acc, nil
}

// ╔══════════════════════════════════════════════════════════╗
// ║  6. BATCH ОПЕРАЦИИ                                     ║
// ╚══════════════════════════════════════════════════════════╝

// CreateAccountsBatch — массовое создание счетов через pgx.Batch
func (r *AccountRepository) CreateAccountsBatch(ctx context.Context, accounts []struct {
    Owner   string
    Type    string
    Balance int64
}) ([]Account, error) {
    batch := &pgx.Batch{}

    for _, acc := range accounts {
        batch.Queue(
            `INSERT INTO accounts (owner, account_type, balance)
             VALUES ($1, $2, $3)
             RETURNING id, owner, balance, version, account_type, created_at, updated_at`,
            acc.Owner, acc.Type, acc.Balance,
        )
    }

    // Отправляем batch
    br := r.pool.SendBatch(ctx, batch)
    defer br.Close()

    var results []Account
    for range accounts {
        var acc Account
        err := br.QueryRow().Scan(
            &acc.ID, &acc.Owner, &acc.Balance, &acc.Version,
            &acc.AccountType, &acc.CreatedAt, &acc.UpdatedAt,
        )
        if err != nil {
            return nil, fmt.Errorf("batch scan: %w", err)
        }
        results = append(results, acc)
    }

    return results, nil
}

// ╔══════════════════════════════════════════════════════════╗
// ║  7. ТРАНЗАКЦИЯ С ТАЙМАУТОМ                             ║
// ╚══════════════════════════════════════════════════════════╝

// TransferWithTimeout — перевод с таймаутом на всю операцию
func (r *AccountRepository) TransferWithTimeout(
    parentCtx context.Context,
    fromID, toID uuid.UUID,
    amount int64,
    timeout time.Duration,
) (*Transfer, error) {
    // Создаём контекст с таймаутом для всей транзакции
    ctx, cancel := context.WithTimeout(parentCtx, timeout)
    defer cancel()

    return r.TransferMoney(ctx, fromID, toID, amount, "transfer with timeout")
}

// ╔══════════════════════════════════════════════════════════╗
// ║  8. REPOSITORY С ПОДДЕРЖКОЙ ВНЕШНЕЙ ТРАНЗАКЦИИ        ║
// ╚══════════════════════════════════════════════════════════╝

// UpdateBalanceTx — обновление в существующей транзакции
func (r *AccountRepository) UpdateBalanceTx(
    ctx context.Context,
    tx pgx.Tx,  // Принимаем транзакцию извне
    id uuid.UUID,
    amount int64,
) error {
    _, err := tx.Exec(ctx,
        `UPDATE accounts SET balance = balance + $1 WHERE id = $2`,
        amount, id,
    )
    return err
}

// ╔══════════════════════════════════════════════════════════╗
// ║  MAIN                                                    ║
// ╚══════════════════════════════════════════════════════════╝

func main() {
    log.SetFlags(log.LstdFlags | log.Lmicroseconds)
    log.Println("🏦 Банковская система (демонстрация транзакций)")

    // Конфигурация БД
    dbURL := os.Getenv("DATABASE_URL")
    if dbURL == "" {
        dbURL = "postgres://postgres:secret@localhost:5432/bankdb?sslmode=disable"
    }

    // ==========================================
    // 1. Запуск миграций
    // ==========================================
    if err := runMigrations(dbURL); err != nil {
        log.Fatalf("Миграции: %v", err)
    }

    // ==========================================
    // 2. Подключение к БД
    // ==========================================
    ctx := context.Background()
    pool, err := pgxpool.New(ctx, dbURL)
    if err != nil {
        log.Fatalf("Пул соединений: %v", err)
    }
    defer pool.Close()

    if err := pool.Ping(ctx); err != nil {
        log.Fatalf("Пинг: %v", err)
    }
    log.Println("✅ Подключено к PostgreSQL")

    repo := NewAccountRepository(pool)

    // ==========================================
    // 3. Создание счетов
    // ==========================================
    log.Println("\n── СОЗДАНИЕ СЧЕТОВ ──")

    alice, err := repo.CreateAccount(ctx, "Alice", "checking", 10000) // $100.00
    if err != nil {
        log.Fatalf("Создание счёта Alice: %v", err)
    }
    log.Printf("  Alice: balance=%d.%02d (version=%d)",
        alice.Balance/100, alice.Balance%100, alice.Version)

    bob, err := repo.CreateAccount(ctx, "Bob", "checking", 5000) // $50.00
    if err != nil {
        log.Fatalf("Создание счёта Bob: %v", err)
    }
    log.Printf("  Bob: balance=%d.%02d (version=%d)",
        bob.Balance/100, bob.Balance%100, bob.Version)

    // ==========================================
    // 4. Перевод денег (транзакция)
    // ==========================================
    log.Println("\n── ПЕРЕВОД $30.00 ОТ ALICE К BOB ──")

    transfer, err := repo.TransferMoney(ctx, alice.ID, bob.ID, 3000, "Оплата за кофе")
    if err != nil {
        log.Printf("  Ошибка перевода: %v", err)
    } else {
        log.Printf("  ✓ Перевод выполнен: %s (сумма: %d.%02d)",
            transfer.ID, transfer.Amount/100, transfer.Amount%100)
    }

    // Проверяем балансы после перевода
    aliceAfter, _ := repo.GetAccount(ctx, alice.ID)
    bobAfter, _ := repo.GetAccount(ctx, bob.ID)
    log.Printf("  Alice: balance=%d.%02d (version=%d)",
        aliceAfter.Balance/100, aliceAfter.Balance%100, aliceAfter.Version)
    log.Printf("  Bob: balance=%d.%02d (version=%d)",
        bobAfter.Balance/100, bobAfter.Balance%100, bobAfter.Version)

    // ==========================================
    // 5. Демонстрация ошибки (недостаточно средств)
    // ==========================================
    log.Println("\n── ПОПЫТКА ПЕРЕВОДА $200.00 (недостаточно) ──")

    _, err = repo.TransferMoney(ctx, bob.ID, alice.ID, 20000, "Ошибка")
    if errors.Is(err, ErrInsufficientFunds) {
        log.Println("  ✓ Ожидаемая ошибка: недостаточно средств")
    } else if err != nil {
        log.Printf("  Ошибка: %v", err)
    }

    // ==========================================
    // 6. Оптимистичная блокировка
    // ==========================================
    log.Println("\n── ОПТИМИСТИЧНАЯ БЛОКИРОВКА ──")

    // Успешное обновление (версия совпадает)
    updated, err := repo.UpdateBalanceOptimistic(ctx, alice.ID, aliceAfter.Version, 5000)
    if err != nil {
        log.Printf("  Ошибка (неожиданно): %v", err)
    } else {
        log.Printf("  ✓ Обновлён баланс Alice: %d.%02d (version=%d)",
            updated.Balance/100, updated.Balance%100, updated.Version)
    }

    // Конфликт версий (используем старую версию)
    _, err = repo.UpdateBalanceOptimistic(ctx, alice.ID, aliceAfter.Version, 9999)
    if errors.Is(err, ErrVersionConflict) {
        log.Println("  ✓ Ожидаемый конфликт версий")
    }

    // ==========================================
    // 7. Batch-создание счетов
    // ==========================================
    log.Println("\n── BATCH СОЗДАНИЕ СЧЕТОВ ──")

    batchAccounts := []struct {
        Owner   string
        Type    string
        Balance int64
    }{
        {"Charlie", "savings", 15000},
        {"Diana", "checking", 20000},
        {"Eve", "credit", 0},
    }

    created, err := repo.CreateAccountsBatch(ctx, batchAccounts)
    if err != nil {
        log.Printf("  Ошибка batch: %v", err)
    } else {
        log.Printf("  ✓ Создано %d счетов:", len(created))
        for _, acc := range created {
            log.Printf("    - %s: %d.%02d [%s]",
                acc.Owner, acc.Balance/100, acc.Balance%100, acc.AccountType)
        }
    }

    log.Println("\n✅ Демонстрация завершена!")
}

🚀 Запуск программы

# Применяем миграции и запускаем
DATABASE_URL="postgres://postgres:secret@localhost:5432/bankdb?sslmode=disable" \
  go run main.go

# Только миграции (без запуска программы)
go run main.go -migrate-only

# Откат миграций
migrate -database "postgres://postgres:secret@localhost:5432/bankdb?sslmode=disable" \
  -path migrations down

# Просмотр версии миграций
migrate -database "postgres://postgres:secret@localhost:5432/bankdb?sslmode=disable" \
  -path migrations version

📊 Уровни изоляции транзакций

УровеньГрязное чтениеНеповторяющееся чтениеФантомное чтениеПроизводительность
Read Uncommitted✅ ВозможноМаксимальная
Read Committed (default)Высокая
Repeatable Read✅ (в PG — нет)Средняя
SerializableНизкая

📊 Пессимистичная vs Оптимистичная блокировка

ХарактеристикаПессимистичная (FOR UPDATE)Оптимистичная (version)
МеханизмБлокирует строку в БДПроверяет версию при обновлении
КонфликтыОжидание снятия блокировкиОшибка, нужно повторить
ПроизводительностьНиже (блокировки)Выше (нет блокировок)
Подходит дляВысокая конкуренция за одну записьРедкие конфликты
DeadlockВозможенНевозможен
⚠️ Типичные ошибки:
💡 Практический совет:

💡 Best practices от сеньоров:

🔑 Ключевые концепции

Что нужно запомнить из этого урока:

💡 Для Node.js разработчика:

← Предыдущий урок Следующий урок →