knex migrate / prisma migrate → golang-migrate/migrate
await pool.transaction(async (trx) => {}) → pool.Begin(ctx) + tx.Commit(ctx)
SELECT ... FOR UPDATE → пессимистичная блокировка
Batch insert → pgx.Batch или COPY FROM
Миграции в коде (TypeORM) → SQL-файлы миграций (up/down)
golang-migrate/migrate
mkdir go-migrations && cd go-migrations
go mod init go-migrations
# Устанавливаем зависимости
go get github.com/jackc/pgx/v5
go get github.com/jackc/pgx/v5/pgxpool
go get github.com/golang-migrate/migrate/v4
go get github.com/golang-migrate/migrate/v4/database/postgres
go get github.com/golang-migrate/migrate/v4/source/file
go get github.com/google/uuid
go mod tidy
# Запускаем PostgreSQL
docker run -d --name pg-lesson14 \
-e POSTGRES_PASSWORD=secret \
-e POSTGRES_DB=bankdb \
-p 5432:5432 \
postgres:16-alpine
# Создаём директорию для миграций
mkdir -p migrations
migrations/000001_create_accounts.up.sql-- UP миграция: создаём таблицу счетов
CREATE TABLE IF NOT EXISTS accounts (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
owner VARCHAR(255) NOT NULL,
balance BIGINT NOT NULL DEFAULT 0
CHECK (balance >= 0), -- Баланс не может быть отрицательным
version INT NOT NULL DEFAULT 1, -- Для оптимистичной блокировки
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX idx_accounts_owner ON accounts(owner);
-- Создаём таблицу транзакций (история переводов)
CREATE TABLE IF NOT EXISTS transfers (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
from_account_id UUID NOT NULL REFERENCES accounts(id),
to_account_id UUID NOT NULL REFERENCES accounts(id),
amount BIGINT NOT NULL CHECK (amount > 0),
description TEXT DEFAULT '',
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
-- Проверка: нельзя перевести самому себе
CONSTRAINT different_accounts CHECK (from_account_id != to_account_id)
);
CREATE INDEX idx_transfers_from ON transfers(from_account_id);
CREATE INDEX idx_transfers_to ON transfers(to_account_id);
CREATE INDEX idx_transfers_created ON transfers(created_at DESC);
-- Триггер для updated_at
CREATE OR REPLACE FUNCTION update_updated_at_column()
RETURNS TRIGGER AS $
BEGIN
NEW.updated_at = NOW();
RETURN NEW;
END;
$ language 'plpgsql';
CREATE TRIGGER update_accounts_updated_at
BEFORE UPDATE ON accounts
FOR EACH ROW
EXECUTE FUNCTION update_updated_at_column();
migrations/000001_create_accounts.down.sql-- DOWN миграция: откатываем
DROP TABLE IF EXISTS transfers;
DROP TABLE IF EXISTS accounts;
DROP FUNCTION IF EXISTS update_updated_at_column();
migrations/000002_add_account_type.up.sql-- Добавляем тип счёта
ALTER TABLE accounts
ADD COLUMN IF NOT EXISTS account_type VARCHAR(50) NOT NULL DEFAULT 'checking'
CHECK (account_type IN ('checking', 'savings', 'credit'));
CREATE INDEX idx_accounts_type ON accounts(account_type);
migrations/000002_add_account_type.down.sqlALTER TABLE accounts DROP COLUMN IF EXISTS account_type;
package main
import (
"context"
"errors"
"fmt"
"log"
"os"
"time"
"github.com/google/uuid"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/golang-migrate/migrate/v4"
_ "github.com/golang-migrate/migrate/v4/database/postgres"
_ "github.com/golang-migrate/migrate/v4/source/file"
)
// ╔══════════════════════════════════════════════════════════╗
// ║ 1. МОДЕЛИ ДАННЫХ ║
// ╚══════════════════════════════════════════════════════════╝
type Account struct {
ID uuid.UUID `json:"id"`
Owner string `json:"owner"`
Balance int64 `json:"balance"` // В центах/копейках!
Version int `json:"version"` // Для оптимистичной блокировки
AccountType string `json:"account_type"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}
type Transfer struct {
ID uuid.UUID `json:"id"`
FromAccountID uuid.UUID `json:"from_account_id"`
ToAccountID uuid.UUID `json:"to_account_id"`
Amount int64 `json:"amount"` // В центах/копейках!
Description string `json:"description"`
CreatedAt time.Time `json:"created_at"`
}
// ╔══════════════════════════════════════════════════════════╗
// ║ 2. ЗАПУСК МИГРАЦИЙ ║
// ╚══════════════════════════════════════════════════════════╝
func runMigrations(dbURL string) error {
log.Println("📦 Запуск миграций...")
// Создаём экземпляр migrate
// file://migrations — путь к директории с файлами миграций
m, err := migrate.New(
"file://migrations",
dbURL,
)
if err != nil {
return fmt.Errorf("создание migrate: %w", err)
}
defer m.Close()
// Применяем все неприменённые миграции
if err := m.Up(); err != nil {
if errors.Is(err, migrate.ErrNoChange) {
log.Println(" ✓ Миграции не требуются (уже актуально)")
return nil
}
return fmt.Errorf("применение миграций: %w", err)
}
log.Println(" ✓ Миграции применены успешно")
return nil
}
// ╔══════════════════════════════════════════════════════════╗
// ║ 3. РЕПОЗИТОРИЙ С ТРАНЗАКЦИЯМИ ║
// ╚══════════════════════════════════════════════════════════╝
type AccountRepository struct {
pool *pgxpool.Pool
}
func NewAccountRepository(pool *pgxpool.Pool) *AccountRepository {
return &AccountRepository{pool: pool}
}
// CreateAccount — создаёт счёт
func (r *AccountRepository) CreateAccount(ctx context.Context, owner, accountType string, initialBalance int64) (*Account, error) {
query := `
INSERT INTO accounts (owner, account_type, balance)
VALUES ($1, $2, $3)
RETURNING id, owner, balance, version, account_type, created_at, updated_at
`
acc := &Account{}
err := r.pool.QueryRow(ctx, query, owner, accountType, initialBalance).Scan(
&acc.ID, &acc.Owner, &acc.Balance, &acc.Version,
&acc.AccountType, &acc.CreatedAt, &acc.UpdatedAt,
)
if err != nil {
return nil, fmt.Errorf("create account: %w", err)
}
return acc, nil
}
// GetAccount — получает счёт по ID
func (r *AccountRepository) GetAccount(ctx context.Context, id uuid.UUID) (*Account, error) {
query := `SELECT id, owner, balance, version, account_type, created_at, updated_at
FROM accounts WHERE id = $1`
acc := &Account{}
err := r.pool.QueryRow(ctx, query, id).Scan(
&acc.ID, &acc.Owner, &acc.Balance, &acc.Version,
&acc.AccountType, &acc.CreatedAt, &acc.UpdatedAt,
)
if err != nil {
if errors.Is(err, pgx.ErrNoRows) {
return nil, fmt.Errorf("account %s: %w", id, ErrNotFound)
}
return nil, fmt.Errorf("get account: %w", err)
}
return acc, nil
}
// ╔══════════════════════════════════════════════════════════╗
// ║ 4. ТРАНЗАКЦИИ: ПЕРЕВОД ДЕНЕГ ║
// ╚══════════════════════════════════════════════════════════╝
var (
ErrNotFound = errors.New("not found")
ErrInsufficientFunds = errors.New("insufficient funds")
ErrVersionConflict = errors.New("version conflict — retry")
)
// TransferMoney — перевод денег с одного счёта на другой.
// Использует транзакцию с пессимистичной блокировкой.
func (r *AccountRepository) TransferMoney(
ctx context.Context,
fromID, toID uuid.UUID,
amount int64,
description string,
) (*Transfer, error) {
// Начинаем транзакцию
// pgx.BeginTxOptions позволяет задать уровень изоляции
tx, err := r.pool.BeginTx(ctx, pgx.TxOptions{
IsoLevel: pgx.ReadCommitted, // Уровень изоляции
AccessMode: pgx.ReadWrite,
})
if err != nil {
return nil, fmt.Errorf("begin tx: %w", err)
}
// defer tx.Rollback(ctx) — безопасно, если транзакция уже закоммичена
defer tx.Rollback(ctx)
// ==========================================
// Шаг 1: Блокируем счёт отправителя (FOR UPDATE)
// ==========================================
var fromBalance int64
var fromVersion int
err = tx.QueryRow(ctx,
`SELECT balance, version FROM accounts
WHERE id = $1
FOR UPDATE`, // Пессимистичная блокировка
fromID,
).Scan(&fromBalance, &fromVersion)
if err != nil {
if errors.Is(err, pgx.ErrNoRows) {
return nil, fmt.Errorf("from account: %w", ErrNotFound)
}
return nil, fmt.Errorf("lock from account: %w", err)
}
// Проверяем баланс
if fromBalance < amount {
return nil, ErrInsufficientFunds
}
// ==========================================
// Шаг 2: Блокируем счёт получателя
// ==========================================
var toBalance int64
err = tx.QueryRow(ctx,
`SELECT balance FROM accounts
WHERE id = $1
FOR UPDATE`,
toID,
).Scan(&toBalance)
if err != nil {
if errors.Is(err, pgx.ErrNoRows) {
return nil, fmt.Errorf("to account: %w", ErrNotFound)
}
return nil, fmt.Errorf("lock to account: %w", err)
}
// ==========================================
// Шаг 3: Списываем с отправителя
// ==========================================
_, err = tx.Exec(ctx,
`UPDATE accounts
SET balance = balance - $1, version = version + 1
WHERE id = $2`,
amount, fromID,
)
if err != nil {
return nil, fmt.Errorf("debit from account: %w", err)
}
// ==========================================
// Шаг 4: Зачисляем получателю
// ==========================================
_, err = tx.Exec(ctx,
`UPDATE accounts
SET balance = balance + $1, version = version + 1
WHERE id = $2`,
amount, toID,
)
if err != nil {
return nil, fmt.Errorf("credit to account: %w", err)
}
// ==========================================
// Шаг 5: Записываем в историю
// ==========================================
transfer := &Transfer{
FromAccountID: fromID,
ToAccountID: toID,
Amount: amount,
Description: description,
}
err = tx.QueryRow(ctx,
`INSERT INTO transfers (from_account_id, to_account_id, amount, description)
VALUES ($1, $2, $3, $4)
RETURNING id, created_at`,
transfer.FromAccountID, transfer.ToAccountID,
transfer.Amount, transfer.Description,
).Scan(&transfer.ID, &transfer.CreatedAt)
if err != nil {
return nil, fmt.Errorf("create transfer record: %w", err)
}
// ==========================================
// Шаг 6: Коммитим транзакцию
// ==========================================
if err := tx.Commit(ctx); err != nil {
return nil, fmt.Errorf("commit tx: %w", err)
}
return transfer, nil
}
// ╔══════════════════════════════════════════════════════════╗
// ║ 5. ОПТИМИСТИЧНАЯ БЛОКИРОВКА ║
// ╚══════════════════════════════════════════════════════════╝
// UpdateBalanceOptimistic — обновление через версию (оптимистичная блокировка)
func (r *AccountRepository) UpdateBalanceOptimistic(
ctx context.Context,
id uuid.UUID,
expectedVersion int,
newBalance int64,
) (*Account, error) {
query := `
UPDATE accounts
SET balance = $1, version = version + 1
WHERE id = $2 AND version = $3
RETURNING id, owner, balance, version, account_type, created_at, updated_at
`
acc := &Account{}
err := r.pool.QueryRow(ctx, query, newBalance, id, expectedVersion).Scan(
&acc.ID, &acc.Owner, &acc.Balance, &acc.Version,
&acc.AccountType, &acc.CreatedAt, &acc.UpdatedAt,
)
if err != nil {
if errors.Is(err, pgx.ErrNoRows) {
// Если 0 строк обновлено — версия изменилась (конфликт)
return nil, ErrVersionConflict
}
return nil, fmt.Errorf("optimistic update: %w", err)
}
return acc, nil
}
// ╔══════════════════════════════════════════════════════════╗
// ║ 6. BATCH ОПЕРАЦИИ ║
// ╚══════════════════════════════════════════════════════════╝
// CreateAccountsBatch — массовое создание счетов через pgx.Batch
func (r *AccountRepository) CreateAccountsBatch(ctx context.Context, accounts []struct {
Owner string
Type string
Balance int64
}) ([]Account, error) {
batch := &pgx.Batch{}
for _, acc := range accounts {
batch.Queue(
`INSERT INTO accounts (owner, account_type, balance)
VALUES ($1, $2, $3)
RETURNING id, owner, balance, version, account_type, created_at, updated_at`,
acc.Owner, acc.Type, acc.Balance,
)
}
// Отправляем batch
br := r.pool.SendBatch(ctx, batch)
defer br.Close()
var results []Account
for range accounts {
var acc Account
err := br.QueryRow().Scan(
&acc.ID, &acc.Owner, &acc.Balance, &acc.Version,
&acc.AccountType, &acc.CreatedAt, &acc.UpdatedAt,
)
if err != nil {
return nil, fmt.Errorf("batch scan: %w", err)
}
results = append(results, acc)
}
return results, nil
}
// ╔══════════════════════════════════════════════════════════╗
// ║ 7. ТРАНЗАКЦИЯ С ТАЙМАУТОМ ║
// ╚══════════════════════════════════════════════════════════╝
// TransferWithTimeout — перевод с таймаутом на всю операцию
func (r *AccountRepository) TransferWithTimeout(
parentCtx context.Context,
fromID, toID uuid.UUID,
amount int64,
timeout time.Duration,
) (*Transfer, error) {
// Создаём контекст с таймаутом для всей транзакции
ctx, cancel := context.WithTimeout(parentCtx, timeout)
defer cancel()
return r.TransferMoney(ctx, fromID, toID, amount, "transfer with timeout")
}
// ╔══════════════════════════════════════════════════════════╗
// ║ 8. REPOSITORY С ПОДДЕРЖКОЙ ВНЕШНЕЙ ТРАНЗАКЦИИ ║
// ╚══════════════════════════════════════════════════════════╝
// UpdateBalanceTx — обновление в существующей транзакции
func (r *AccountRepository) UpdateBalanceTx(
ctx context.Context,
tx pgx.Tx, // Принимаем транзакцию извне
id uuid.UUID,
amount int64,
) error {
_, err := tx.Exec(ctx,
`UPDATE accounts SET balance = balance + $1 WHERE id = $2`,
amount, id,
)
return err
}
// ╔══════════════════════════════════════════════════════════╗
// ║ MAIN ║
// ╚══════════════════════════════════════════════════════════╝
func main() {
log.SetFlags(log.LstdFlags | log.Lmicroseconds)
log.Println("🏦 Банковская система (демонстрация транзакций)")
// Конфигурация БД
dbURL := os.Getenv("DATABASE_URL")
if dbURL == "" {
dbURL = "postgres://postgres:secret@localhost:5432/bankdb?sslmode=disable"
}
// ==========================================
// 1. Запуск миграций
// ==========================================
if err := runMigrations(dbURL); err != nil {
log.Fatalf("Миграции: %v", err)
}
// ==========================================
// 2. Подключение к БД
// ==========================================
ctx := context.Background()
pool, err := pgxpool.New(ctx, dbURL)
if err != nil {
log.Fatalf("Пул соединений: %v", err)
}
defer pool.Close()
if err := pool.Ping(ctx); err != nil {
log.Fatalf("Пинг: %v", err)
}
log.Println("✅ Подключено к PostgreSQL")
repo := NewAccountRepository(pool)
// ==========================================
// 3. Создание счетов
// ==========================================
log.Println("\n── СОЗДАНИЕ СЧЕТОВ ──")
alice, err := repo.CreateAccount(ctx, "Alice", "checking", 10000) // $100.00
if err != nil {
log.Fatalf("Создание счёта Alice: %v", err)
}
log.Printf(" Alice: balance=%d.%02d (version=%d)",
alice.Balance/100, alice.Balance%100, alice.Version)
bob, err := repo.CreateAccount(ctx, "Bob", "checking", 5000) // $50.00
if err != nil {
log.Fatalf("Создание счёта Bob: %v", err)
}
log.Printf(" Bob: balance=%d.%02d (version=%d)",
bob.Balance/100, bob.Balance%100, bob.Version)
// ==========================================
// 4. Перевод денег (транзакция)
// ==========================================
log.Println("\n── ПЕРЕВОД $30.00 ОТ ALICE К BOB ──")
transfer, err := repo.TransferMoney(ctx, alice.ID, bob.ID, 3000, "Оплата за кофе")
if err != nil {
log.Printf(" Ошибка перевода: %v", err)
} else {
log.Printf(" ✓ Перевод выполнен: %s (сумма: %d.%02d)",
transfer.ID, transfer.Amount/100, transfer.Amount%100)
}
// Проверяем балансы после перевода
aliceAfter, _ := repo.GetAccount(ctx, alice.ID)
bobAfter, _ := repo.GetAccount(ctx, bob.ID)
log.Printf(" Alice: balance=%d.%02d (version=%d)",
aliceAfter.Balance/100, aliceAfter.Balance%100, aliceAfter.Version)
log.Printf(" Bob: balance=%d.%02d (version=%d)",
bobAfter.Balance/100, bobAfter.Balance%100, bobAfter.Version)
// ==========================================
// 5. Демонстрация ошибки (недостаточно средств)
// ==========================================
log.Println("\n── ПОПЫТКА ПЕРЕВОДА $200.00 (недостаточно) ──")
_, err = repo.TransferMoney(ctx, bob.ID, alice.ID, 20000, "Ошибка")
if errors.Is(err, ErrInsufficientFunds) {
log.Println(" ✓ Ожидаемая ошибка: недостаточно средств")
} else if err != nil {
log.Printf(" Ошибка: %v", err)
}
// ==========================================
// 6. Оптимистичная блокировка
// ==========================================
log.Println("\n── ОПТИМИСТИЧНАЯ БЛОКИРОВКА ──")
// Успешное обновление (версия совпадает)
updated, err := repo.UpdateBalanceOptimistic(ctx, alice.ID, aliceAfter.Version, 5000)
if err != nil {
log.Printf(" Ошибка (неожиданно): %v", err)
} else {
log.Printf(" ✓ Обновлён баланс Alice: %d.%02d (version=%d)",
updated.Balance/100, updated.Balance%100, updated.Version)
}
// Конфликт версий (используем старую версию)
_, err = repo.UpdateBalanceOptimistic(ctx, alice.ID, aliceAfter.Version, 9999)
if errors.Is(err, ErrVersionConflict) {
log.Println(" ✓ Ожидаемый конфликт версий")
}
// ==========================================
// 7. Batch-создание счетов
// ==========================================
log.Println("\n── BATCH СОЗДАНИЕ СЧЕТОВ ──")
batchAccounts := []struct {
Owner string
Type string
Balance int64
}{
{"Charlie", "savings", 15000},
{"Diana", "checking", 20000},
{"Eve", "credit", 0},
}
created, err := repo.CreateAccountsBatch(ctx, batchAccounts)
if err != nil {
log.Printf(" Ошибка batch: %v", err)
} else {
log.Printf(" ✓ Создано %d счетов:", len(created))
for _, acc := range created {
log.Printf(" - %s: %d.%02d [%s]",
acc.Owner, acc.Balance/100, acc.Balance%100, acc.AccountType)
}
}
log.Println("\n✅ Демонстрация завершена!")
}
# Применяем миграции и запускаем
DATABASE_URL="postgres://postgres:secret@localhost:5432/bankdb?sslmode=disable" \
go run main.go
# Только миграции (без запуска программы)
go run main.go -migrate-only
# Откат миграций
migrate -database "postgres://postgres:secret@localhost:5432/bankdb?sslmode=disable" \
-path migrations down
# Просмотр версии миграций
migrate -database "postgres://postgres:secret@localhost:5432/bankdb?sslmode=disable" \
-path migrations version
| Уровень | Грязное чтение | Неповторяющееся чтение | Фантомное чтение | Производительность |
|---|---|---|---|---|
| Read Uncommitted | ✅ Возможно | ✅ | ✅ | Максимальная |
| Read Committed (default) | ❌ | ✅ | ✅ | Высокая |
| Repeatable Read | ❌ | ❌ | ✅ (в PG — нет) | Средняя |
| Serializable | ❌ | ❌ | ❌ | Низкая |
| Характеристика | Пессимистичная (FOR UPDATE) | Оптимистичная (version) |
|---|---|---|
| Механизм | Блокирует строку в БД | Проверяет версию при обновлении |
| Конфликты | Ожидание снятия блокировки | Ошибка, нужно повторить |
| Производительность | Ниже (блокировки) | Выше (нет блокировок) |
| Подходит для | Высокая конкуренция за одну запись | Редкие конфликты |
| Deadlock | Возможен | Невозможен |
💡 Best practices от сеньоров:
💡 Для Node.js разработчика: