Урок 10: Синхронизация — мьютексы, WaitGroup, atomic, errgroup

Урок 10. Синхронизация — мьютексы, WaitGroup, atomic, errgroup

🔄 Node.js → Go (ключевые аналоги):

📋 Что изучаем

📦 Инициализация проекта

mkdir go-sync && cd go-sync
go mod init go-sync

# Устанавливаем errgroup
go get golang.org/x/sync/errgroup
go mod tidy

💻 Код программы

package main

import (
    "context"
    "fmt"
    "math/rand"
    "sync"
    "sync/atomic"
    "time"

    "golang.org/x/sync/errgroup"
    "golang.org/x/sync/semaphore"
)

// ╔══════════════════════════════════════════════════════════╗
// ║  1. SYNC.MUTEX — взаимное исключение                   ║
// ╚══════════════════════════════════════════════════════════╝

// Counter — потокобезопасный счётчик с мьютексом
type Counter struct {
    mu    sync.Mutex // Мьютекс защищает value
    value int
}

// Increment — увеличивает счётчик (потокобезопасно)
func (c *Counter) Increment() {
    c.mu.Lock()         // Захватываем блокировку
    defer c.mu.Unlock() // Гарантированно освобождаем (даже при панике)
    c.value++
}

// Value — возвращает текущее значение (потокобезопасно)
func (c *Counter) Value() int {
    c.mu.Lock()
    defer c.mu.Unlock()
    return c.value
}

func demoMutex() {
    fmt.Println("── 1. SYNC.MUTEX ──")

    var wg sync.WaitGroup
    counter := &Counter{}

    // Запускаем 1000 горутин, каждая инкрементирует 100 раз
    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for j := 0; j < 100; j++ {
                counter.Increment()
            }
        }()
    }

    wg.Wait()
    fmt.Printf("  Итоговое значение: %d (ожидаем 100000)\n", counter.Value())
    fmt.Println()
}

// ╔══════════════════════════════════════════════════════════╗
// ║  2. SYNC.RWMUTEX — read-write блокировка               ║
// ╚══════════════════════════════════════════════════════════╝

// Cache — потокобезопасный кэш с RWMutex
type Cache struct {
    mu   sync.RWMutex
    data map[string]string
}

func NewCache() *Cache {
    return &Cache{data: make(map[string]string)}
}

// Get — множественное чтение (RLock)
func (c *Cache) Get(key string) (string, bool) {
    c.mu.RLock()         // Несколько читателей могут заходить одновременно
    defer c.mu.RUnlock()
    val, ok := c.data[key]
    return val, ok
}

// Set — эксклюзивная запись (Lock)
func (c *Cache) Set(key, value string) {
    c.mu.Lock()          // Только ОДИН писатель, читатели ждут
    defer c.mu.Unlock()
    c.data[key] = value
}

// Size — возвращает размер кэша
func (c *Cache) Size() int {
    c.mu.RLock()
    defer c.mu.RUnlock()
    return len(c.data)
}

func demoRWMutex() {
    fmt.Println("── 2. SYNC.RWMUTEX ──")

    cache := NewCache()
    var wg sync.WaitGroup

    // 5 писателей
    for i := 0; i < 5; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            key := fmt.Sprintf("key-%d", id)
            cache.Set(key, fmt.Sprintf("value-%d", id))
        }(i)
    }

    // 100 читателей
    for i := 0; i < 100; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for j := 0; j < 100; j++ {
                cache.Get(fmt.Sprintf("key-%d", rand.Intn(5)))
            }
        }()
    }

    wg.Wait()
    fmt.Printf("  Размер кэша: %d\n", cache.Size())
    fmt.Println()
}

// ╔══════════════════════════════════════════════════════════╗
// ║  3. SYNC.WAITGROUP — ожидание группы горутин           ║
// ╚══════════════════════════════════════════════════════════╝

func demoWaitGroup() {
    fmt.Println("── 3. SYNC.WAITGROUP ──")

    var wg sync.WaitGroup

    // Запускаем 5 задач параллельно
    for i := 1; i <= 5; i++ {
        wg.Add(1) // Увеличиваем счётчик ДО запуска горутины

        go func(id int) {
            defer wg.Done() // Уменьшаем счётчик при завершении

            // Имитация работы
            workTime := time.Duration(rand.Intn(300)) * time.Millisecond
            fmt.Printf("  Задача %d: работаю %v...\n", id, workTime)
            time.Sleep(workTime)
            fmt.Printf("  Задача %d: завершена\n", id)
        }(i)
    }

    fmt.Println("  Ожидание завершения всех задач...")
    wg.Wait() // Блокируется, пока счётчик не станет 0
    fmt.Println("  Все задачи завершены!")
    fmt.Println()
}

// ╔══════════════════════════════════════════════════════════╗
// ║  4. SYNC.ONCE — однократное выполнение                 ║
// ╚══════════════════════════════════════════════════════════╝

type Database struct {
    once     sync.Once
    connStr  string
    connected bool
}

func (db *Database) Connect() {
    // Функция внутри Do выполнится ТОЛЬКО ОДИН РАЗ,
    // даже если Connect вызовут из многих горутин одновременно.
    db.once.Do(func() {
        fmt.Println("    Выполняем подключение к БД...")
        time.Sleep(100 * time.Millisecond)
        db.connected = true
        fmt.Println("    Подключение установлено!")
    })
}

func (db *Database) IsConnected() bool {
    return db.connected
}

func demoOnce() {
    fmt.Println("── 4. SYNC.ONCE ──")

    db := &Database{connStr: "postgres://localhost:5432/mydb"}
    var wg sync.WaitGroup

    // 10 горутин одновременно вызывают Connect
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            fmt.Printf("  Горутина %d: вызываю Connect()\n", id)
            db.Connect()
            fmt.Printf("  Горутина %d: IsConnected=%v\n", id, db.IsConnected())
        }(i)
    }

    wg.Wait()
    fmt.Printf("  Итог: подключена? %v\n", db.IsConnected())
    fmt.Println()
}

// ╔══════════════════════════════════════════════════════════╗
// ║  5. SYNC/ATOMIC — атомарные операции                   ║
// ╚══════════════════════════════════════════════════════════╝

func demoAtomic() {
    fmt.Println("── 5. ATOMIC ──")

    // Атомарные операции БЫСТРЕЕ мьютексов.
    // Подходят для простых счётчиков, флагов, указателей.
    // Для сложных структур — используй мьютекс.

    var counter int64 // Специальный тип для atomic
    var wg sync.WaitGroup

    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for j := 0; j < 100; j++ {
                atomic.AddInt64(&counter, 1) // Атомарный инкремент
            }
        }()
    }

    wg.Wait()
    fmt.Printf("  Atomic counter: %d (ожидаем 100000)\n", counter)

    // CompareAndSwap — атомарное "если равно, то заменить"
    var flag int32
    fmt.Printf("  flag до CAS: %d\n", flag)
    swapped := atomic.CompareAndSwapInt32(&flag, 0, 1) // если flag==0, установить 1
    fmt.Printf("  CAS(0→1): успех=%v, flag=%d\n", swapped, flag)
    swapped = atomic.CompareAndSwapInt32(&flag, 0, 2) // если flag==0, установить 2
    fmt.Printf("  CAS(0→2): успех=%v, flag=%d\n", swapped, flag)

    // Load/Store — атомарное чтение/запись
    atomic.StoreInt64(&counter, 42)
    val := atomic.LoadInt64(&counter)
    fmt.Printf("  Load после Store: %d\n", val)
    fmt.Println()
}

// ╔══════════════════════════════════════════════════════════╗
// ║  6. ERRGROUP — группа горутин с ошибками               ║
// ╚══════════════════════════════════════════════════════════╝

func fetchUser(ctx context.Context, id int) (string, error) {
    // Имитация запроса
    delay := time.Duration(rand.Intn(200)) * time.Millisecond
    select {
    case <-ctx.Done():
        return "", ctx.Err()
    case <-time.After(delay):
        if id == 3 {
            return "", fmt.Errorf("user %d not found", id)
        }
        return fmt.Sprintf("User-%d", id), nil
    }
}

func demoErrGroup() {
    fmt.Println("── 6. ERRGROUP ──")

    // errgroup.Group — как sync.WaitGroup, но:
    // 1. Останавливается при ПЕРВОЙ ошибке
    // 2. Передаёт контекст с отменой
    // 3. Собирает ошибки

    g, ctx := errgroup.WithContext(context.Background())

    users := make([]string, 5) // Результаты

    for i := 1; i <= 5; i++ {
        i := i // Захват переменной для замыкания
        g.Go(func() error {
            user, err := fetchUser(ctx, i)
            if err != nil {
                return fmt.Errorf("fetch user %d: %w", i, err)
            }
            users[i-1] = user
            return nil
        })
    }

    // Ожидаем завершения всех горутин
    if err := g.Wait(); err != nil {
        fmt.Printf("  Ошибка: %v\n", err)
        fmt.Printf("  Контекст отменён: %v\n", ctx.Err())
    } else {
        fmt.Printf("  Все пользователи загружены: %v\n", users)
    }

    fmt.Println()
}

// ╔══════════════════════════════════════════════════════════╗
// ║  7. SEMAPHORE — ограничение параллелизма               ║
// ╚══════════════════════════════════════════════════════════╝

func demoSemaphore() {
    fmt.Println("── 7. SEMAPHORE (ограничение параллелизма) ──")

    // Ограничиваем одновременные запросы до 3
    sem := semaphore.NewWeighted(3)
    ctx := context.Background()
    var wg sync.WaitGroup

    start := time.Now()

    for i := 1; i <= 9; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()

            // Захватываем слот семафора
            if err := sem.Acquire(ctx, 1); err != nil {
                fmt.Printf("  Ошибка захвата семафора: %v\n", err)
                return
            }
            defer sem.Release(1) // Освобождаем слот

            // Работа
            fmt.Printf("  Запрос %d: выполняю (слотов занято: %d)...\n",
                id, 3-sem.Weighted(len(sem)))
            time.Sleep(300 * time.Millisecond)
            fmt.Printf("  Запрос %d: завершён\n", id)
        }(i)
    }

    wg.Wait()
    fmt.Printf("  Все запросы выполнены за %v\n", time.Since(start))
    fmt.Println()
}

// ╔══════════════════════════════════════════════════════════╗
// ║  8. DATA RACE — демонстрация и обнаружение             ║
// ╚══════════════════════════════════════════════════════════╝

func demoDataRace() {
    fmt.Println("── 8. DATA RACE (обнаружение) ──")
    fmt.Println("  Запусти с флагом -race для обнаружения гонок:")
    fmt.Println("    go run -race main.go")
    fmt.Println()

    // НЕПРАВИЛЬНЫЙ код с гонкой данных (закомментирован):
    // var counter int
    // for i := 0; i < 1000; i++ {
    //     go func() { counter++ }() // DATA RACE!
    // }

    fmt.Println("  Правильные способы избежать data race:")
    fmt.Println("    1. sync.Mutex — для сложных структур")
    fmt.Println("    2. atomic.AddInt64 — для простых счётчиков")
    fmt.Println("    3. Каналы — передача данных вместо общего доступа")
    fmt.Println("    4. sync.RWMutex — когда больше читают, чем пишут")
    fmt.Println()
}

// ╔══════════════════════════════════════════════════════════╗
// ║  9. СРАВНЕНИЕ ПОДХОДОВ К СИНХРОНИЗАЦИИ                ║
// ╚══════════════════════════════════════════════════════════╝

func demoComparison() {
    fmt.Println("── 9. ЧТО ВЫБРАТЬ? ──")
    fmt.Println("  ┌─────────────────────┬──────────────────────────────────┐")
    fmt.Println("  │ Ситуация            │ Инструмент                       │")
    fmt.Println("  ├─────────────────────┼──────────────────────────────────┤")
    fmt.Println("  │ Простой счётчик     │ atomic.AddInt64                  │")
    fmt.Println("  │ Сложная структура   │ sync.Mutex                      │")
    fmt.Println("  │ Много читателей     │ sync.RWMutex                    │")
    fmt.Println("  │ Ожидание группы     │ sync.WaitGroup                  │")
    fmt.Println("  │ Однократный запуск  │ sync.Once                       │")
    fmt.Println("  │ Группа с ошибками   │ errgroup.Group                  │")
    fmt.Println("  │ Ограничение конк-ти │ semaphore.Weighted              │")
    fmt.Println("  │ Передача данных     │ chan (вместо мьютекса)          │")
    fmt.Println("  └─────────────────────┴──────────────────────────────────┘")
    fmt.Println()
}

// ╔══════════════════════════════════════════════════════════╗
// ║  MAIN                                                    ║
// ╚══════════════════════════════════════════════════════════╝

func main() {
    fmt.Println("╔══════════════════════════════════════════╗")
    fmt.Println("║   СИНХРОНИЗАЦИЯ: МЬЮТЕКСЫ, ATOMIC,     ║")
    fmt.Println("║   WAITGROUP, ERRGROUP                  ║")
    fmt.Println("╚══════════════════════════════════════════╝")

    demoMutex()
    demoRWMutex()
    demoWaitGroup()
    demoOnce()
    demoAtomic()
    demoErrGroup()
    demoSemaphore()
    demoDataRace()
    demoComparison()

    fmt.Println("✅ Демонстрация завершена!")
}

🧠 Визуализация: мьютекс vs RWMutex

MUTEX (sync.Mutex): ┌────────────────────────────────────────────────────────┐ │ Горутина 1: Lock() ──── работа ──── Unlock() │ │ Горутина 2: ЖДЁТ… Lock() ── работа │ │ Горутина 3: ЖДЁТ… ЖДЁТ… Lock() │ │ Только ОДНА горутина в критической секции. │ └────────────────────────────────────────────────────────┘

RWMUTEX (sync.RWMutex): ┌────────────────────────────────────────────────────────┐ │ Читатель 1: RLock() ─── чтение ─── RUnlock() │ │ Читатель 2: RLock() ─── чтение ─── RUnlock() ← ВМЕСТЕ│ │ Читатель 3: RLock() ─── чтение ─── RUnlock() │ │ Писатель: ЖДЁТ окончания ВСЕХ читателей │ │ Lock() ── запись ── Unlock() │ │ МНОГО читателей одновременно, писатель — эксклюзивно. │ └────────────────────────────────────────────────────────┘

DON’T COMMUNICATE BY SHARING MEMORY; SHARE MEMORY BY COMMUNICATING. (Не делись памятью — передавай данные через каналы.)

📊 Инструменты синхронизации

ИнструментНазначениеПроизводительностьСложность
sync.MutexВзаимное исключениеСредняяНизкая
sync.RWMutexМного читателей, мало писателейВысокая для чтенияНизкая
sync.WaitGroupОжидание группы горутинВысокаяНизкая
sync.OnceОднократное выполнениеВысокаяНизкая
atomicАтомарные операцииОчень высокаяСредняя
chanПередача данных + синхронизацияВысокаяСредняя
errgroupГруппа горутин с ошибкамиВысокаяНизкая
semaphoreОграничение параллелизмаВысокаяНизкая

🚀 Запуск программы

# Обычный запуск
go run main.go

# Запуск с детектором гонок (ОБЯЗАТЕЛЬНО для production-кода!)
go run -race main.go

# Сборка с race-детектором
go build -race -o sync-demo main.go
./sync-demo

# Тестирование с race-детектором
go test -race ./...
⚠️ Типичные ошибки:
💡 Практический совет:

💡 Best practices от сеньоров:

🔑 Ключевые концепции

Что нужно запомнить из этого урока:

💡 Для Node.js разработчика:

← Предыдущий урок Следующий урок →