Урок 13: PostgreSQL с pgx — подключение, пул, CRUD

Урок 13. PostgreSQL с pgx — подключение, пул, CRUD

🔄 Node.js → Go (ключевые аналоги):

📋 Что изучаем

📦 Инициализация проекта

mkdir go-pgx-crud && cd go-pgx-crud
go mod init go-pgx-crud

# Устанавливаем pgx
go get github.com/jackc/pgx/v5
go get github.com/jackc/pgx/v5/pgxpool
go get github.com/google/uuid
go mod tidy

# Запускаем PostgreSQL (если есть Docker)
docker run -d --name pg-lesson13 \
  -e POSTGRES_PASSWORD=secret \
  -e POSTGRES_DB=tasksdb \
  -p 5432:5432 \
  postgres:16-alpine

🗄️ SQL для создания таблиц

# Подключаемся к БД
docker exec -it pg-lesson13 psql -U postgres -d tasksdb

# Создаём таблицу задач
-- Создание таблицы tasks
CREATE TABLE IF NOT EXISTS tasks (
    id          UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    title       VARCHAR(255) NOT NULL,
    description TEXT DEFAULT '',
    status      VARCHAR(50) NOT NULL DEFAULT 'pending'
        CHECK (status IN ('pending', 'in_progress', 'done', 'cancelled')),
    priority    INT NOT NULL DEFAULT 0
        CHECK (priority >= 0 AND priority <= 5),
    created_at  TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    updated_at  TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    completed_at TIMESTAMPTZ
);

-- Индекс для поиска по статусу
CREATE INDEX idx_tasks_status ON tasks(status);

-- Индекс для поиска по приоритету
CREATE INDEX idx_tasks_priority ON tasks(priority DESC);

-- Индекс для полнотекстового поиска по заголовку
CREATE INDEX idx_tasks_title_trgm ON tasks USING gin (title gin_trgm_ops);

-- Триггер для автоматического обновления updated_at
CREATE OR REPLACE FUNCTION update_updated_at_column()
RETURNS TRIGGER AS $
BEGIN
    NEW.updated_at = NOW();
    RETURN NEW;
END;
$ language 'plpgsql';

CREATE TRIGGER update_tasks_updated_at
    BEFORE UPDATE ON tasks
    FOR EACH ROW
    EXECUTE FUNCTION update_updated_at_column();

💻 Код программы

package main

import (
    "context"
    "errors"
    "fmt"
    "log"
    "os"
    "os/signal"
    "syscall"
    "time"

    "github.com/jackc/pgx/v5"
    "github.com/jackc/pgx/v5/pgconn"
    "github.com/jackc/pgx/v5/pgxpool"
    "github.com/google/uuid"
)

// ╔══════════════════════════════════════════════════════════╗
// ║  1. МОДЕЛЬ ДАННЫХ                                      ║
// ╚══════════════════════════════════════════════════════════╝

// TaskStatus — кастомный тип для статуса задачи
type TaskStatus string

const (
    StatusPending    TaskStatus = "pending"
    StatusInProgress TaskStatus = "in_progress"
    StatusDone       TaskStatus = "done"
    StatusCancelled  TaskStatus = "cancelled"
)

// Task — модель задачи
type Task struct {
    ID          uuid.UUID  `json:"id"`
    Title       string     `json:"title"`
    Description string     `json:"description"`
    Status      TaskStatus `json:"status"`
    Priority    int        `json:"priority"`
    CreatedAt   time.Time  `json:"created_at"`
    UpdatedAt   time.Time  `json:"updated_at"`
    CompletedAt *time.Time `json:"completed_at,omitempty"` // Указатель — может быть NULL
}

// CreateTaskParams — параметры для создания задачи
type CreateTaskParams struct {
    Title       string `json:"title"`
    Description string `json:"description"`
    Priority    int    `json:"priority"`
}

// UpdateTaskParams — параметры для обновления задачи
type UpdateTaskParams struct {
    Title       *string     `json:"title,omitempty"`
    Description *string     `json:"description,omitempty"`
    Status      *TaskStatus `json:"status,omitempty"`
    Priority    *int        `json:"priority,omitempty"`
}

// TaskFilter — фильтры для поиска задач
type TaskFilter struct {
    Status   *TaskStatus
    Priority *int
    Limit    int
    Offset   int
}

// ╔══════════════════════════════════════════════════════════╗
// ║  2. ИНТЕРФЕЙС РЕПОЗИТОРИЯ                              ║
// ╚══════════════════════════════════════════════════════════╝

// TaskRepository — интерфейс для работы с задачами.
// Позволяет подменять реализацию (реальная БД / моки для тестов).
type TaskRepository interface {
    Create(ctx context.Context, params CreateTaskParams) (*Task, error)
    GetByID(ctx context.Context, id uuid.UUID) (*Task, error)
    List(ctx context.Context, filter TaskFilter) ([]Task, error)
    Update(ctx context.Context, id uuid.UUID, params UpdateTaskParams) (*Task, error)
    Delete(ctx context.Context, id uuid.UUID) error
    Count(ctx context.Context, filter TaskFilter) (int, error)
}

// ╔══════════════════════════════════════════════════════════╗
// ║  3. РЕАЛИЗАЦИЯ РЕПОЗИТОРИЯ НА PGX                     ║
// ╚══════════════════════════════════════════════════════════╝

// PgxTaskRepository — реализация TaskRepository на pgxpool
type PgxTaskRepository struct {
    pool *pgxpool.Pool
}

// NewPgxTaskRepository — конструктор
func NewPgxTaskRepository(pool *pgxpool.Pool) *PgxTaskRepository {
    return &PgxTaskRepository{pool: pool}
}

// Create — создаёт задачу
func (r *PgxTaskRepository) Create(ctx context.Context, params CreateTaskParams) (*Task, error) {
    // SQL-запрос с RETURNING — возвращает созданную строку.
    // $1, $2, $3 — плейсхолдеры PostgreSQL (НЕ ? как в MySQL!).
    query := `
        INSERT INTO tasks (title, description, priority)
        VALUES ($1, $2, $3)
        RETURNING id, title, description, status, priority, created_at, updated_at, completed_at
    `

    task := &Task{}
    err := r.pool.QueryRow(ctx, query,
        params.Title,
        params.Description,
        params.Priority,
    ).Scan(
        &task.ID,
        &task.Title,
        &task.Description,
        &task.Status,
        &task.Priority,
        &task.CreatedAt,
        &task.UpdatedAt,
        &task.CompletedAt, // *time.Time — pgx сам обработает NULL
    )

    if err != nil {
        return nil, fmt.Errorf("create task: %w", err)
    }
    return task, nil
}

// GetByID — получает задачу по ID
func (r *PgxTaskRepository) GetByID(ctx context.Context, id uuid.UUID) (*Task, error) {
    query := `
        SELECT id, title, description, status, priority, created_at, updated_at, completed_at
        FROM tasks
        WHERE id = $1
    `

    task := &Task{}
    err := r.pool.QueryRow(ctx, query, id).Scan(
        &task.ID,
        &task.Title,
        &task.Description,
        &task.Status,
        &task.Priority,
        &task.CreatedAt,
        &task.UpdatedAt,
        &task.CompletedAt,
    )

    if err != nil {
        // pgx.ErrNoRows — аналог sql.ErrNoRows
        if errors.Is(err, pgx.ErrNoRows) {
            return nil, ErrNotFound
        }
        return nil, fmt.Errorf("get task by id: %w", err)
    }
    return task, nil
}

// List — возвращает список задач с фильтрацией и пагинацией
func (r *PgxTaskRepository) List(ctx context.Context, filter TaskFilter) ([]Task, error) {
    // Динамическое построение запроса (простой вариант)
    query := `
        SELECT id, title, description, status, priority, created_at, updated_at, completed_at
        FROM tasks
        WHERE 1=1
    `
    args := []any{}
    argNum := 1

    if filter.Status != nil {
        query += fmt.Sprintf(" AND status = $%d", argNum)
        args = append(args, *filter.Status)
        argNum++
    }

    if filter.Priority != nil {
        query += fmt.Sprintf(" AND priority = $%d", argNum)
        args = append(args, *filter.Priority)
        argNum++
    }

    query += " ORDER BY priority DESC, created_at DESC"

    if filter.Limit > 0 {
        query += fmt.Sprintf(" LIMIT $%d", argNum)
        args = append(args, filter.Limit)
        argNum++
    }

    if filter.Offset > 0 {
        query += fmt.Sprintf(" OFFSET $%d", argNum)
        args = append(args, filter.Offset)
        argNum++
    }

    rows, err := r.pool.Query(ctx, query, args...)
    if err != nil {
        return nil, fmt.Errorf("list tasks: %w", err)
    }
    defer rows.Close()

    // Итерация по результатам
    var tasks []Task
    for rows.Next() {
        var task Task
        if err := rows.Scan(
            &task.ID,
            &task.Title,
            &task.Description,
            &task.Status,
            &task.Priority,
            &task.CreatedAt,
            &task.UpdatedAt,
            &task.CompletedAt,
        ); err != nil {
            return nil, fmt.Errorf("scan task: %w", err)
        }
        tasks = append(tasks, task)
    }

    // Проверяем ошибки после цикла
    if err := rows.Err(); err != nil {
        return nil, fmt.Errorf("rows iteration: %w", err)
    }

    // Возвращаем пустой слайс, а не nil
    if tasks == nil {
        tasks = []Task{}
    }
    return tasks, nil
}

// Update — обновляет задачу
func (r *PgxTaskRepository) Update(ctx context.Context, id uuid.UUID, params UpdateTaskParams) (*Task, error) {
    // Обновление только переданных полей
    query := "UPDATE tasks SET "
    args := []any{id}
    argNum := 2
    setClauses := []string{}

    if params.Title != nil {
        setClauses = append(setClauses, fmt.Sprintf("title = $%d", argNum))
        args = append(args, *params.Title)
        argNum++
    }

    if params.Description != nil {
        setClauses = append(setClauses, fmt.Sprintf("description = $%d", argNum))
        args = append(args, *params.Description)
        argNum++
    }

    if params.Status != nil {
        setClauses = append(setClauses, fmt.Sprintf("status = $%d", argNum))
        args = append(args, *params.Status)
        argNum++
        // Если задача завершена — ставим completed_at
        if *params.Status == StatusDone {
            setClauses = append(setClauses, fmt.Sprintf("completed_at = NOW()"))
        }
    }

    if params.Priority != nil {
        setClauses = append(setClauses, fmt.Sprintf("priority = $%d", argNum))
        args = append(args, *params.Priority)
        argNum++
    }

    if len(setClauses) == 0 {
        return r.GetByID(ctx, id) // Нечего обновлять
    }

    query += strings.Join(setClauses, ", ")
    query += fmt.Sprintf(" WHERE id = $1 RETURNING id, title, description, status, priority, created_at, updated_at, completed_at")

    task := &Task{}
    err := r.pool.QueryRow(ctx, query, args...).Scan(
        &task.ID,
        &task.Title,
        &task.Description,
        &task.Status,
        &task.Priority,
        &task.CreatedAt,
        &task.UpdatedAt,
        &task.CompletedAt,
    )

    if err != nil {
        if errors.Is(err, pgx.ErrNoRows) {
            return nil, ErrNotFound
        }
        return nil, fmt.Errorf("update task: %w", err)
    }
    return task, nil
}

// Delete — удаляет задачу
func (r *PgxTaskRepository) Delete(ctx context.Context, id uuid.UUID) error {
    result, err := r.pool.Exec(ctx, "DELETE FROM tasks WHERE id = $1", id)
    if err != nil {
        return fmt.Errorf("delete task: %w", err)
    }

    // Проверяем, была ли удалена строка
    if result.RowsAffected() == 0 {
        return ErrNotFound
    }
    return nil
}

// Count — подсчёт задач по фильтру
func (r *PgxTaskRepository) Count(ctx context.Context, filter TaskFilter) (int, error) {
    query := "SELECT COUNT(*) FROM tasks WHERE 1=1"
    args := []any{}
    argNum := 1

    if filter.Status != nil {
        query += fmt.Sprintf(" AND status = $%d", argNum)
        args = append(args, *filter.Status)
        argNum++
    }

    if filter.Priority != nil {
        query += fmt.Sprintf(" AND priority = $%d", argNum)
        args = append(args, *filter.Priority)
        argNum++
    }

    var count int
    if err := r.pool.QueryRow(ctx, query, args...).Scan(&count); err != nil {
        return 0, fmt.Errorf("count tasks: %w", err)
    }
    return count, nil
}

// ╔══════════════════════════════════════════════════════════╗
// ║  4. ОШИБКИ И ВСПОМОГАТЕЛЬНЫЕ ФУНКЦИИ                   ║
// ╚══════════════════════════════════════════════════════════╝

var (
    ErrNotFound = errors.New("task not found")
)

// ╔══════════════════════════════════════════════════════════╗
// ║  5. ДЕМОНСТРАЦИЯ РАБОТЫ                                ║
// ╚══════════════════════════════════════════════════════════╝

func main() {
    log.SetFlags(log.LstdFlags | log.Lmicroseconds)
    log.Println("🚀 Подключение к PostgreSQL...")

    // Читаем конфигурацию из переменных окружения
    dbURL := os.Getenv("DATABASE_URL")
    if dbURL == "" {
        dbURL = "postgres://postgres:secret@localhost:5432/tasksdb?sslmode=disable"
    }

    // Создаём контекст с таймаутом для подключения
    ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
    defer cancel()

    // pgxpool.New — создаёт пул соединений.
    // Аналог new pg.Pool({ connectionString }) в Node.js.
    pool, err := pgxpool.New(ctx, dbURL)
    if err != nil {
        log.Fatalf("Не удалось создать пул: %v", err)
    }
    // defer pool.Close() — закроется при выходе из main

    // Проверяем соединение (Ping)
    if err := pool.Ping(ctx); err != nil {
        log.Fatalf("Не удалось подключиться к БД: %v", err)
    }
    log.Println("✅ Подключено к PostgreSQL")

    // Статистика пула
    stats := pool.Stat()
    log.Printf("Пул: всего соединений=%d, свободных=%d, занятых=%d",
        stats.TotalConns(), stats.FreeConns(), stats.AcquiredConns())

    // Создаём репозиторий
    repo := NewPgxTaskRepository(pool)

    // ==========================================
    // ДЕМОНСТРАЦИЯ CRUD
    // ==========================================
    demoCtx := context.Background()

    // 1. CREATE — создаём три задачи
    log.Println("\n── CREATE ──")
    task1, err := repo.Create(demoCtx, CreateTaskParams{
        Title:       "Изучить Go",
        Description: "Пройти курс из 40 уроков",
        Priority:    5,
    })
    if err != nil {
        log.Printf("Ошибка создания: %v", err)
    } else {
        log.Printf("Создана задача: %s (ID: %s)", task1.Title, task1.ID)
    }

    task2, _ := repo.Create(demoCtx, CreateTaskParams{
        Title:       "Написать микросервис",
        Description: "Использовать чистую архитектуру",
        Priority:    4,
    })
    log.Printf("Создана задача: %s (ID: %s)", task2.Title, task2.ID)

    task3, _ := repo.Create(demoCtx, CreateTaskParams{
        Title:       "Настроить CI/CD",
        Description: "GitHub Actions + Docker",
        Priority:    3,
    })
    log.Printf("Создана задача: %s (ID: %s)", task3.Title, task3.ID)

    // 2. READ — получаем все задачи
    log.Println("\n── LIST ──")
    tasks, err := repo.List(demoCtx, TaskFilter{Limit: 10})
    if err != nil {
        log.Printf("Ошибка получения списка: %v", err)
    } else {
        log.Printf("Всего задач: %d", len(tasks))
        for _, t := range tasks {
            log.Printf("  - [%s] %s (priority=%d)", t.Status, t.Title, t.Priority)
        }
    }

    // 3. READ by ID
    log.Println("\n── GET BY ID ──")
    found, err := repo.GetByID(demoCtx, task1.ID)
    if err != nil {
        log.Printf("Ошибка поиска: %v", err)
    } else {
        log.Printf("Найдена: %s (создана: %s)", found.Title, found.CreatedAt.Format(time.RFC3339))
    }

    // Попытка найти несуществующую задачу
    _, err = repo.GetByID(demoCtx, uuid.New())
    if errors.Is(err, ErrNotFound) {
        log.Println("Несуществующая задача: not found (ожидаемо)")
    }

    // 4. UPDATE
    log.Println("\n── UPDATE ──")
    inProgress := StatusInProgress
    updated, err := repo.Update(demoCtx, task1.ID, UpdateTaskParams{
        Status: &inProgress,
    })
    if err != nil {
        log.Printf("Ошибка обновления: %v", err)
    } else {
        log.Printf("Обновлена: [%s] %s", updated.Status, updated.Title)
    }

    // Завершаем задачу
    done := StatusDone
    completed, err := repo.Update(demoCtx, task1.ID, UpdateTaskParams{
        Status: &done,
    })
    if err != nil {
        log.Printf("Ошибка завершения: %v", err)
    } else {
        log.Printf("Завершена: [%s] %s (completed_at: %s)",
            completed.Status, completed.Title,
            completed.CompletedAt.Format(time.RFC3339))
    }

    // 5. COUNT
    log.Println("\n── COUNT ──")
    count, err := repo.Count(demoCtx, TaskFilter{Status: &done})
    if err != nil {
        log.Printf("Ошибка подсчёта: %v", err)
    } else {
        log.Printf("Завершённых задач: %d", count)
    }

    // 6. DELETE
    log.Println("\n── DELETE ──")
    if err := repo.Delete(demoCtx, task3.ID); err != nil {
        log.Printf("Ошибка удаления: %v", err)
    } else {
        log.Println("Задача удалена")
    }

    // Проверяем удаление
    tasks, _ = repo.List(demoCtx, TaskFilter{Limit: 10})
    log.Printf("После удаления: %d задач", len(tasks))

    // ==========================================
    // GRACEFUL SHUTDOWN
    // ==========================================
    log.Println("\n🛑 Нажмите Ctrl+C для выхода...")
    quit := make(chan os.Signal, 1)
    signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
    <-quit

    log.Println("Закрываем пул соединений...")
    pool.Close()
    log.Println("✅ Готово")
}

// strings import
import "strings"

⚠️ Важно: Добавьте "strings" в блок импортов в начале файла.

🧪 Тестирование

# Запускаем PostgreSQL (если ещё не)
docker run -d --name pg-lesson13 \
  -e POSTGRES_PASSWORD=secret \
  -e POSTGRES_DB=tasksdb \
  -p 5432:5432 \
  postgres:16-alpine

# Создаём таблицы (подключаемся к БД)
docker exec -i pg-lesson13 psql -U postgres -d tasksdb < schema.sql

# Запускаем программу
DATABASE_URL="postgres://postgres:secret@localhost:5432/tasksdb?sslmode=disable" \
  go run main.go

# Или скомпилировать и запустить
go build -o pgx-demo main.go
./pgx-demo

📊 Методы pgxpool

МетодНазначениеКогда использовать
QueryRow(ctx, sql, args...)Одна строка результатаSELECT … WHERE id = $1
Query(ctx, sql, args...)Несколько строкSELECT … WHERE status = $1
Exec(ctx, sql, args...)Без возврата строкINSERT, UPDATE, DELETE
SendBatch(ctx, batch)Пакет запросовМножественные INSERT
Begin(ctx)Начало транзакцииСм. урок 14

🚀 Запуск программы

# Запуск с локальной БД
DATABASE_URL="postgres://postgres:secret@localhost:5432/tasksdb?sslmode=disable" \
  go run main.go

# Сборка
go build -o pgx-demo main.go
./pgx-demo
⚠️ Типичные ошибки:
💡 Практический совет:

💡 Best practices от сеньоров:

🔑 Ключевые концепции

Что нужно запомнить из этого урока:

💡 Для Node.js разработчика:

← Предыдущий урок Следующий урок →