pg (node-postgres) → github.com/jackc/pgx/v5
new pg.Pool(config) → pgxpool.New(ctx, connString)
await pool.query(sql, [val1, val2]) → pool.Query(ctx, sql, val1, val2)
$1, $2 вместо ? (MySQL) — PostgreSQL-стиль
rows.rows[0] → rows.Scan(&val1, &val2) — сканирование в переменные
ORM (Prisma, TypeORM) → чистый SQL через pgx (или sqlc)
QueryRow vs Query — одна строка vs много строкpgx.ErrNoRows — проверка “не найдено”mkdir go-pgx-crud && cd go-pgx-crud
go mod init go-pgx-crud
# Устанавливаем pgx
go get github.com/jackc/pgx/v5
go get github.com/jackc/pgx/v5/pgxpool
go get github.com/google/uuid
go mod tidy
# Запускаем PostgreSQL (если есть Docker)
docker run -d --name pg-lesson13 \
-e POSTGRES_PASSWORD=secret \
-e POSTGRES_DB=tasksdb \
-p 5432:5432 \
postgres:16-alpine
# Подключаемся к БД
docker exec -it pg-lesson13 psql -U postgres -d tasksdb
# Создаём таблицу задач
-- Создание таблицы tasks
CREATE TABLE IF NOT EXISTS tasks (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
title VARCHAR(255) NOT NULL,
description TEXT DEFAULT '',
status VARCHAR(50) NOT NULL DEFAULT 'pending'
CHECK (status IN ('pending', 'in_progress', 'done', 'cancelled')),
priority INT NOT NULL DEFAULT 0
CHECK (priority >= 0 AND priority <= 5),
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
completed_at TIMESTAMPTZ
);
-- Индекс для поиска по статусу
CREATE INDEX idx_tasks_status ON tasks(status);
-- Индекс для поиска по приоритету
CREATE INDEX idx_tasks_priority ON tasks(priority DESC);
-- Индекс для полнотекстового поиска по заголовку
CREATE INDEX idx_tasks_title_trgm ON tasks USING gin (title gin_trgm_ops);
-- Триггер для автоматического обновления updated_at
CREATE OR REPLACE FUNCTION update_updated_at_column()
RETURNS TRIGGER AS $
BEGIN
NEW.updated_at = NOW();
RETURN NEW;
END;
$ language 'plpgsql';
CREATE TRIGGER update_tasks_updated_at
BEFORE UPDATE ON tasks
FOR EACH ROW
EXECUTE FUNCTION update_updated_at_column();
package main
import (
"context"
"errors"
"fmt"
"log"
"os"
"os/signal"
"syscall"
"time"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgconn"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/google/uuid"
)
// ╔══════════════════════════════════════════════════════════╗
// ║ 1. МОДЕЛЬ ДАННЫХ ║
// ╚══════════════════════════════════════════════════════════╝
// TaskStatus — кастомный тип для статуса задачи
type TaskStatus string
const (
StatusPending TaskStatus = "pending"
StatusInProgress TaskStatus = "in_progress"
StatusDone TaskStatus = "done"
StatusCancelled TaskStatus = "cancelled"
)
// Task — модель задачи
type Task struct {
ID uuid.UUID `json:"id"`
Title string `json:"title"`
Description string `json:"description"`
Status TaskStatus `json:"status"`
Priority int `json:"priority"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
CompletedAt *time.Time `json:"completed_at,omitempty"` // Указатель — может быть NULL
}
// CreateTaskParams — параметры для создания задачи
type CreateTaskParams struct {
Title string `json:"title"`
Description string `json:"description"`
Priority int `json:"priority"`
}
// UpdateTaskParams — параметры для обновления задачи
type UpdateTaskParams struct {
Title *string `json:"title,omitempty"`
Description *string `json:"description,omitempty"`
Status *TaskStatus `json:"status,omitempty"`
Priority *int `json:"priority,omitempty"`
}
// TaskFilter — фильтры для поиска задач
type TaskFilter struct {
Status *TaskStatus
Priority *int
Limit int
Offset int
}
// ╔══════════════════════════════════════════════════════════╗
// ║ 2. ИНТЕРФЕЙС РЕПОЗИТОРИЯ ║
// ╚══════════════════════════════════════════════════════════╝
// TaskRepository — интерфейс для работы с задачами.
// Позволяет подменять реализацию (реальная БД / моки для тестов).
type TaskRepository interface {
Create(ctx context.Context, params CreateTaskParams) (*Task, error)
GetByID(ctx context.Context, id uuid.UUID) (*Task, error)
List(ctx context.Context, filter TaskFilter) ([]Task, error)
Update(ctx context.Context, id uuid.UUID, params UpdateTaskParams) (*Task, error)
Delete(ctx context.Context, id uuid.UUID) error
Count(ctx context.Context, filter TaskFilter) (int, error)
}
// ╔══════════════════════════════════════════════════════════╗
// ║ 3. РЕАЛИЗАЦИЯ РЕПОЗИТОРИЯ НА PGX ║
// ╚══════════════════════════════════════════════════════════╝
// PgxTaskRepository — реализация TaskRepository на pgxpool
type PgxTaskRepository struct {
pool *pgxpool.Pool
}
// NewPgxTaskRepository — конструктор
func NewPgxTaskRepository(pool *pgxpool.Pool) *PgxTaskRepository {
return &PgxTaskRepository{pool: pool}
}
// Create — создаёт задачу
func (r *PgxTaskRepository) Create(ctx context.Context, params CreateTaskParams) (*Task, error) {
// SQL-запрос с RETURNING — возвращает созданную строку.
// $1, $2, $3 — плейсхолдеры PostgreSQL (НЕ ? как в MySQL!).
query := `
INSERT INTO tasks (title, description, priority)
VALUES ($1, $2, $3)
RETURNING id, title, description, status, priority, created_at, updated_at, completed_at
`
task := &Task{}
err := r.pool.QueryRow(ctx, query,
params.Title,
params.Description,
params.Priority,
).Scan(
&task.ID,
&task.Title,
&task.Description,
&task.Status,
&task.Priority,
&task.CreatedAt,
&task.UpdatedAt,
&task.CompletedAt, // *time.Time — pgx сам обработает NULL
)
if err != nil {
return nil, fmt.Errorf("create task: %w", err)
}
return task, nil
}
// GetByID — получает задачу по ID
func (r *PgxTaskRepository) GetByID(ctx context.Context, id uuid.UUID) (*Task, error) {
query := `
SELECT id, title, description, status, priority, created_at, updated_at, completed_at
FROM tasks
WHERE id = $1
`
task := &Task{}
err := r.pool.QueryRow(ctx, query, id).Scan(
&task.ID,
&task.Title,
&task.Description,
&task.Status,
&task.Priority,
&task.CreatedAt,
&task.UpdatedAt,
&task.CompletedAt,
)
if err != nil {
// pgx.ErrNoRows — аналог sql.ErrNoRows
if errors.Is(err, pgx.ErrNoRows) {
return nil, ErrNotFound
}
return nil, fmt.Errorf("get task by id: %w", err)
}
return task, nil
}
// List — возвращает список задач с фильтрацией и пагинацией
func (r *PgxTaskRepository) List(ctx context.Context, filter TaskFilter) ([]Task, error) {
// Динамическое построение запроса (простой вариант)
query := `
SELECT id, title, description, status, priority, created_at, updated_at, completed_at
FROM tasks
WHERE 1=1
`
args := []any{}
argNum := 1
if filter.Status != nil {
query += fmt.Sprintf(" AND status = $%d", argNum)
args = append(args, *filter.Status)
argNum++
}
if filter.Priority != nil {
query += fmt.Sprintf(" AND priority = $%d", argNum)
args = append(args, *filter.Priority)
argNum++
}
query += " ORDER BY priority DESC, created_at DESC"
if filter.Limit > 0 {
query += fmt.Sprintf(" LIMIT $%d", argNum)
args = append(args, filter.Limit)
argNum++
}
if filter.Offset > 0 {
query += fmt.Sprintf(" OFFSET $%d", argNum)
args = append(args, filter.Offset)
argNum++
}
rows, err := r.pool.Query(ctx, query, args...)
if err != nil {
return nil, fmt.Errorf("list tasks: %w", err)
}
defer rows.Close()
// Итерация по результатам
var tasks []Task
for rows.Next() {
var task Task
if err := rows.Scan(
&task.ID,
&task.Title,
&task.Description,
&task.Status,
&task.Priority,
&task.CreatedAt,
&task.UpdatedAt,
&task.CompletedAt,
); err != nil {
return nil, fmt.Errorf("scan task: %w", err)
}
tasks = append(tasks, task)
}
// Проверяем ошибки после цикла
if err := rows.Err(); err != nil {
return nil, fmt.Errorf("rows iteration: %w", err)
}
// Возвращаем пустой слайс, а не nil
if tasks == nil {
tasks = []Task{}
}
return tasks, nil
}
// Update — обновляет задачу
func (r *PgxTaskRepository) Update(ctx context.Context, id uuid.UUID, params UpdateTaskParams) (*Task, error) {
// Обновление только переданных полей
query := "UPDATE tasks SET "
args := []any{id}
argNum := 2
setClauses := []string{}
if params.Title != nil {
setClauses = append(setClauses, fmt.Sprintf("title = $%d", argNum))
args = append(args, *params.Title)
argNum++
}
if params.Description != nil {
setClauses = append(setClauses, fmt.Sprintf("description = $%d", argNum))
args = append(args, *params.Description)
argNum++
}
if params.Status != nil {
setClauses = append(setClauses, fmt.Sprintf("status = $%d", argNum))
args = append(args, *params.Status)
argNum++
// Если задача завершена — ставим completed_at
if *params.Status == StatusDone {
setClauses = append(setClauses, fmt.Sprintf("completed_at = NOW()"))
}
}
if params.Priority != nil {
setClauses = append(setClauses, fmt.Sprintf("priority = $%d", argNum))
args = append(args, *params.Priority)
argNum++
}
if len(setClauses) == 0 {
return r.GetByID(ctx, id) // Нечего обновлять
}
query += strings.Join(setClauses, ", ")
query += fmt.Sprintf(" WHERE id = $1 RETURNING id, title, description, status, priority, created_at, updated_at, completed_at")
task := &Task{}
err := r.pool.QueryRow(ctx, query, args...).Scan(
&task.ID,
&task.Title,
&task.Description,
&task.Status,
&task.Priority,
&task.CreatedAt,
&task.UpdatedAt,
&task.CompletedAt,
)
if err != nil {
if errors.Is(err, pgx.ErrNoRows) {
return nil, ErrNotFound
}
return nil, fmt.Errorf("update task: %w", err)
}
return task, nil
}
// Delete — удаляет задачу
func (r *PgxTaskRepository) Delete(ctx context.Context, id uuid.UUID) error {
result, err := r.pool.Exec(ctx, "DELETE FROM tasks WHERE id = $1", id)
if err != nil {
return fmt.Errorf("delete task: %w", err)
}
// Проверяем, была ли удалена строка
if result.RowsAffected() == 0 {
return ErrNotFound
}
return nil
}
// Count — подсчёт задач по фильтру
func (r *PgxTaskRepository) Count(ctx context.Context, filter TaskFilter) (int, error) {
query := "SELECT COUNT(*) FROM tasks WHERE 1=1"
args := []any{}
argNum := 1
if filter.Status != nil {
query += fmt.Sprintf(" AND status = $%d", argNum)
args = append(args, *filter.Status)
argNum++
}
if filter.Priority != nil {
query += fmt.Sprintf(" AND priority = $%d", argNum)
args = append(args, *filter.Priority)
argNum++
}
var count int
if err := r.pool.QueryRow(ctx, query, args...).Scan(&count); err != nil {
return 0, fmt.Errorf("count tasks: %w", err)
}
return count, nil
}
// ╔══════════════════════════════════════════════════════════╗
// ║ 4. ОШИБКИ И ВСПОМОГАТЕЛЬНЫЕ ФУНКЦИИ ║
// ╚══════════════════════════════════════════════════════════╝
var (
ErrNotFound = errors.New("task not found")
)
// ╔══════════════════════════════════════════════════════════╗
// ║ 5. ДЕМОНСТРАЦИЯ РАБОТЫ ║
// ╚══════════════════════════════════════════════════════════╝
func main() {
log.SetFlags(log.LstdFlags | log.Lmicroseconds)
log.Println("🚀 Подключение к PostgreSQL...")
// Читаем конфигурацию из переменных окружения
dbURL := os.Getenv("DATABASE_URL")
if dbURL == "" {
dbURL = "postgres://postgres:secret@localhost:5432/tasksdb?sslmode=disable"
}
// Создаём контекст с таймаутом для подключения
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
// pgxpool.New — создаёт пул соединений.
// Аналог new pg.Pool({ connectionString }) в Node.js.
pool, err := pgxpool.New(ctx, dbURL)
if err != nil {
log.Fatalf("Не удалось создать пул: %v", err)
}
// defer pool.Close() — закроется при выходе из main
// Проверяем соединение (Ping)
if err := pool.Ping(ctx); err != nil {
log.Fatalf("Не удалось подключиться к БД: %v", err)
}
log.Println("✅ Подключено к PostgreSQL")
// Статистика пула
stats := pool.Stat()
log.Printf("Пул: всего соединений=%d, свободных=%d, занятых=%d",
stats.TotalConns(), stats.FreeConns(), stats.AcquiredConns())
// Создаём репозиторий
repo := NewPgxTaskRepository(pool)
// ==========================================
// ДЕМОНСТРАЦИЯ CRUD
// ==========================================
demoCtx := context.Background()
// 1. CREATE — создаём три задачи
log.Println("\n── CREATE ──")
task1, err := repo.Create(demoCtx, CreateTaskParams{
Title: "Изучить Go",
Description: "Пройти курс из 40 уроков",
Priority: 5,
})
if err != nil {
log.Printf("Ошибка создания: %v", err)
} else {
log.Printf("Создана задача: %s (ID: %s)", task1.Title, task1.ID)
}
task2, _ := repo.Create(demoCtx, CreateTaskParams{
Title: "Написать микросервис",
Description: "Использовать чистую архитектуру",
Priority: 4,
})
log.Printf("Создана задача: %s (ID: %s)", task2.Title, task2.ID)
task3, _ := repo.Create(demoCtx, CreateTaskParams{
Title: "Настроить CI/CD",
Description: "GitHub Actions + Docker",
Priority: 3,
})
log.Printf("Создана задача: %s (ID: %s)", task3.Title, task3.ID)
// 2. READ — получаем все задачи
log.Println("\n── LIST ──")
tasks, err := repo.List(demoCtx, TaskFilter{Limit: 10})
if err != nil {
log.Printf("Ошибка получения списка: %v", err)
} else {
log.Printf("Всего задач: %d", len(tasks))
for _, t := range tasks {
log.Printf(" - [%s] %s (priority=%d)", t.Status, t.Title, t.Priority)
}
}
// 3. READ by ID
log.Println("\n── GET BY ID ──")
found, err := repo.GetByID(demoCtx, task1.ID)
if err != nil {
log.Printf("Ошибка поиска: %v", err)
} else {
log.Printf("Найдена: %s (создана: %s)", found.Title, found.CreatedAt.Format(time.RFC3339))
}
// Попытка найти несуществующую задачу
_, err = repo.GetByID(demoCtx, uuid.New())
if errors.Is(err, ErrNotFound) {
log.Println("Несуществующая задача: not found (ожидаемо)")
}
// 4. UPDATE
log.Println("\n── UPDATE ──")
inProgress := StatusInProgress
updated, err := repo.Update(demoCtx, task1.ID, UpdateTaskParams{
Status: &inProgress,
})
if err != nil {
log.Printf("Ошибка обновления: %v", err)
} else {
log.Printf("Обновлена: [%s] %s", updated.Status, updated.Title)
}
// Завершаем задачу
done := StatusDone
completed, err := repo.Update(demoCtx, task1.ID, UpdateTaskParams{
Status: &done,
})
if err != nil {
log.Printf("Ошибка завершения: %v", err)
} else {
log.Printf("Завершена: [%s] %s (completed_at: %s)",
completed.Status, completed.Title,
completed.CompletedAt.Format(time.RFC3339))
}
// 5. COUNT
log.Println("\n── COUNT ──")
count, err := repo.Count(demoCtx, TaskFilter{Status: &done})
if err != nil {
log.Printf("Ошибка подсчёта: %v", err)
} else {
log.Printf("Завершённых задач: %d", count)
}
// 6. DELETE
log.Println("\n── DELETE ──")
if err := repo.Delete(demoCtx, task3.ID); err != nil {
log.Printf("Ошибка удаления: %v", err)
} else {
log.Println("Задача удалена")
}
// Проверяем удаление
tasks, _ = repo.List(demoCtx, TaskFilter{Limit: 10})
log.Printf("После удаления: %d задач", len(tasks))
// ==========================================
// GRACEFUL SHUTDOWN
// ==========================================
log.Println("\n🛑 Нажмите Ctrl+C для выхода...")
quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
<-quit
log.Println("Закрываем пул соединений...")
pool.Close()
log.Println("✅ Готово")
}
// strings import
import "strings"
⚠️ Важно: Добавьте "strings" в блок импортов в начале файла.
# Запускаем PostgreSQL (если ещё не)
docker run -d --name pg-lesson13 \
-e POSTGRES_PASSWORD=secret \
-e POSTGRES_DB=tasksdb \
-p 5432:5432 \
postgres:16-alpine
# Создаём таблицы (подключаемся к БД)
docker exec -i pg-lesson13 psql -U postgres -d tasksdb < schema.sql
# Запускаем программу
DATABASE_URL="postgres://postgres:secret@localhost:5432/tasksdb?sslmode=disable" \
go run main.go
# Или скомпилировать и запустить
go build -o pgx-demo main.go
./pgx-demo
| Метод | Назначение | Когда использовать |
|---|---|---|
QueryRow(ctx, sql, args...) | Одна строка результата | SELECT … WHERE id = $1 |
Query(ctx, sql, args...) | Несколько строк | SELECT … WHERE status = $1 |
Exec(ctx, sql, args...) | Без возврата строк | INSERT, UPDATE, DELETE |
SendBatch(ctx, batch) | Пакет запросов | Множественные INSERT |
Begin(ctx) | Начало транзакции | См. урок 14 |
# Запуск с локальной БД
DATABASE_URL="postgres://postgres:secret@localhost:5432/tasksdb?sslmode=disable" \
go run main.go
# Сборка
go build -o pgx-demo main.go
./pgx-demo
defer rows.Close() — утечка соединений.
💡 Best practices от сеньоров:
💡 Для Node.js разработчика:
pgxpool — прямой аналог pg.Pool из node-postgres.
await pool.query() → pool.Query(ctx, ...). Блокирует горутину, но не поток ОС.rows[0] → rows.Scan(&var1, &var2) — явное сканирование в переменные.