Урок 8: Конкурентность — горутины и каналы

Урок 8. Конкурентность — горутины и каналы

🔄 Node.js → Go (ключевые отличия):

📋 Что изучаем

📦 Инициализация проекта

mkdir go-concurrency && cd go-concurrency
go mod init go-concurrency

💻 Код программы

package main

import (
    "fmt"
    "math/rand"
    "sync"
    "time"
)

// ╔══════════════════════════════════════════════════════════╗
// ║  1. ГОРУТИНЫ — ЗАПУСК                                  ║
// ╚══════════════════════════════════════════════════════════╝

func demoGoroutines() {
    fmt.Println("── 1. ГОРУТИНЫ ──")

    // Горутина запускается ключевым словом go.
    // Это НЕ поток ОС — это легковесный поток внутри рантайма Go.
    // Стек начинается с ~2 КБ и растёт/сжимается динамически.

    // Запуск горутины
    go func() {
        fmt.Println("  Привет из горутины!")
    }()

    // Главная горутина (main) тоже горутина.
    // Если main завершится, ВСЕ горутины будут убиты.
    // Поэтому даём время на выполнение:
    time.Sleep(50 * time.Millisecond)

    // Демонстрация параллельного выполнения
    fmt.Println("  Запуск 5 горутин:")
    for i := 1; i <= 5; i++ {
        // ВАЖНО: передаём i как аргумент!
        // Если использовать i напрямую — замыкание захватит
        // ПЕРЕМЕННУЮ, а не значение. Все горутины увидят одно и то же i.
        go func(id int) {
            fmt.Printf("    Горутина %d\n", id)
        }(i)
    }
    time.Sleep(50 * time.Millisecond)
    fmt.Println()
}

// ╔══════════════════════════════════════════════════════════╗
// ║  2. КАНАЛЫ — СОЗДАНИЕ И БАЗОВЫЕ ОПЕРАЦИИ              ║
// ╚══════════════════════════════════════════════════════════╝

func demoChannels() {
    fmt.Println("── 2. КАНАЛЫ ──")

    // Канал — типизированная очередь для передачи данных между горутинами.
    // Аналог: потокобезопасная очередь + EventEmitter + backpressure.

    // Небуферизированный канал: отправитель БЛОКИРУЕТСЯ,
    // пока получатель не прочитает.
    unbuffered := make(chan string)

    go func() {
        time.Sleep(100 * time.Millisecond)
        unbuffered <- "данные" // Блокируется, пока main не прочитает
        fmt.Println("  Отправитель: данные отправлены")
    }()

    msg := <-unbuffered // Блокируется, пока горутина не отправит
    fmt.Printf("  Получатель: получил %q\n", msg)

    // Буферизированный канал: отправитель НЕ блокируется,
    // пока буфер не заполнен.
    buffered := make(chan int, 3) // Буфер на 3 элемента

    buffered <- 1 // Не блокируется
    buffered <- 2 // Не блокируется
    buffered <- 3 // Не блокируется
    // buffered <- 4 // ЗАБЛОКИРУЕТСЯ — буфер полон!

    fmt.Printf("  Буферизированный канал: %d %d %d\n",
        <-buffered, <-buffered, <-buffered)

    // Закрытие канала: close(ch)
    // После закрытия:
    // - Чтение возвращает zero value + false
    // - Запись в закрытый канал → ПАНИКА
    // - Чтение из закрытого пустого канала → сразу zero value

    ch := make(chan int, 2)
    ch <- 10
    ch <- 20
    close(ch)

    // Чтение из закрытого канала
    val, ok := <-ch
    fmt.Printf("  Закрытый канал: val=%d ok=%v\n", val, ok) // 10 true
    val, ok = <-ch
    fmt.Printf("  Закрытый канал: val=%d ok=%v\n", val, ok) // 20 true
    val, ok = <-ch
    fmt.Printf("  Закрытый канал: val=%d ok=%v\n", val, ok) // 0 false

    // Range по каналу — читает до закрытия
    ch2 := make(chan int, 3)
    go func() {
        for i := 1; i <= 3; i++ {
            ch2 <- i
        }
        close(ch2) // Обязательно закрыть, иначе range будет висеть вечно!
    }()

    fmt.Print("  Range по каналу: ")
    for val := range ch2 {
        fmt.Printf("%d ", val)
    }
    fmt.Println("\n")
}

// ╔══════════════════════════════════════════════════════════╗
// ║  3. НАПРАВЛЕННЫЕ КАНАЛЫ                                ║
// ╚══════════════════════════════════════════════════════════╝

// producer — может ТОЛЬКО отправлять в канал (chan<-)
func producer(out chan<- int, n int) {
    for i := 1; i <= n; i++ {
        out <- i
    }
    close(out) // Закрывает канал, сигнализируя о завершении
}

// consumer — может ТОЛЬКО читать из канала (<-chan)
func consumer(in <-chan int, done chan<- bool) {
    sum := 0
    for val := range in {
        sum += val
    }
    fmt.Printf("  Сумма: %d\n", sum)
    done <- true
}

func demoDirectionalChannels() {
    fmt.Println("── 3. НАПРАВЛЕННЫЕ КАНАЛЫ ──")

    ch := make(chan int, 5)
    done := make(chan bool)

    // Направленные каналы — защита от ошибок на этапе компиляции
    go producer(ch, 10) // ch преобразуется в chan<- int
    go consumer(ch, done) // ch преобразуется в <-chan int

    <-done
    fmt.Println()
}

// ╔══════════════════════════════════════════════════════════╗
// ║  4. SELECT — МУЛЬТИПЛЕКСИРОВАНИЕ                       ║
// ╚══════════════════════════════════════════════════════════╝

func demoSelect() {
    fmt.Println("── 4. SELECT ──")

    // select — как switch, но для каналов.
    // Выполняет ТУ ветку, чей канал готов первым.
    // Если готовы несколько — выбирает СЛУЧАЙНЫЙ.

    ch1 := make(chan string)
    ch2 := make(chan string)

    // Горутина 1: отправляет с задержкой
    go func() {
        time.Sleep(100 * time.Millisecond)
        ch1 <- "из канала 1 (медленный)"
    }()

    // Горутина 2: отправляет с меньшей задержкой
    go func() {
        time.Sleep(50 * time.Millisecond)
        ch2 <- "из канала 2 (быстрый)"
    }()

    // select ждёт первый готовый канал
    for i := 0; i < 2; i++ {
        select {
        case msg := <-ch1:
            fmt.Printf("  Получено: %s\n", msg)
        case msg := <-ch2:
            fmt.Printf("  Получено: %s\n", msg)
        case <-time.After(200 * time.Millisecond):
            // time.After — канал, который закроется через указанное время
            fmt.Println("  Таймаут!")
        }
    }

    // default в select — неблокирующая операция
    ch := make(chan int)
    select {
    case val := <-ch:
        fmt.Printf("  Получено: %d\n", val)
    default:
        fmt.Println("  Канал пуст (default)")
    }

    fmt.Println()
}

// ╔══════════════════════════════════════════════════════════╗
// ║  5. ПАТТЕРНЫ: FAN-OUT, FAN-IN, PIPELINE               ║
// ╚══════════════════════════════════════════════════════════╝

// generator — генерирует числа в канал (источник)
func generator(nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        for _, n := range nums {
            out <- n
        }
        close(out)
    }()
    return out
}

// square — возводит в квадрат (этап пайплайна)
func square(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        for n := range in {
            out <- n * n
        }
        close(out)
    }()
    return out
}

// merge — fan-in: объединяет несколько каналов в один
func merge(channels ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int)

    // Запускаем горутину для каждого входного канала
    output := func(c <-chan int) {
        defer wg.Done()
        for n := range c {
            out <- n
        }
    }

    wg.Add(len(channels))
    for _, c := range channels {
        go output(c)
    }

    // Закрываем out, когда все input-горутины завершены
    go func() {
        wg.Wait()
        close(out)
    }()

    return out
}

func demoPatterns() {
    fmt.Println("── 5. ПАТТЕРНЫ: PIPELINE, FAN-OUT, FAN-IN ──")

    // Pipeline: generator → square → вывод
    fmt.Print("  Pipeline (gen → square): ")
    for n := range square(generator(1, 2, 3, 4, 5)) {
        fmt.Printf("%d ", n)
    }
    fmt.Println()

    // Fan-out + Fan-in
    // Распределяем работу по двум square, затем объединяем
    in := generator(2, 4, 6, 8, 10)

    // Fan-out: два обработчика читают из одного канала
    sq1 := square(in)
    sq2 := square(in) // Оба читают из in (разные значения — round-robin)

    // Fan-in: объединяем результаты
    fmt.Print("  Fan-out/in (2 workers): ")
    for n := range merge(sq1, sq2) {
        fmt.Printf("%d ", n)
    }
    fmt.Println("\n")
}

// ╔══════════════════════════════════════════════════════════╗
// ║  6. WORKER POOL (пул воркеров)                         ║
// ╚══════════════════════════════════════════════════════════╝

func worker(id int, jobs <-chan int, results chan<- string) {
    for job := range jobs {
        // Имитация работы
        workTime := time.Duration(rand.Intn(300)) * time.Millisecond
        time.Sleep(workTime)
        results <- fmt.Sprintf("Worker %d обработал задачу %d за %v", id, job, workTime)
    }
}

func demoWorkerPool() {
    fmt.Println("── 6. WORKER POOL ──")

    const numJobs = 10
    const numWorkers = 3

    jobs := make(chan int, numJobs)
    results := make(chan string, numJobs)

    // Запуск воркеров
    for w := 1; w <= numWorkers; w++ {
        go worker(w, jobs, results)
    }

    // Отправка задач
    for j := 1; j <= numJobs; j++ {
        jobs <- j
    }
    close(jobs) // Закрываем канал задач → воркеры завершатся

    // Сбор результатов
    for r := 1; r <= numJobs; r++ {
        fmt.Println("  ", <-results)
    }

    fmt.Println()
}

// ╔══════════════════════════════════════════════════════════╗
// ║  7. ТАЙМЕРЫ И ТИКЕРЫ                                   ║
// ╚══════════════════════════════════════════════════════════╝

func demoTimers() {
    fmt.Println("── 7. ТАЙМЕРЫ И ТИКЕРЫ ──")

    // time.After — канал, который вернёт значение через заданное время
    // Аналог setTimeout
    fmt.Print("  Ожидание 100ms...")
    <-time.After(100 * time.Millisecond)
    fmt.Println(" готово!")

    // time.NewTimer — таймер, который можно остановить
    timer := time.NewTimer(200 * time.Millisecond)
    go func() {
        <-timer.C
        fmt.Println("  Таймер сработал")
    }()

    // Передумали — останавливаем
    if timer.Stop() {
        fmt.Println("  Таймер остановлен до срабатывания")
    }

    // time.NewTicker — периодические события (аналог setInterval)
    ticker := time.NewTicker(50 * time.Millisecond)
    defer ticker.Stop()

    done := make(chan bool)
    go func() {
        time.Sleep(200 * time.Millisecond)
        done <- true
    }()

    fmt.Print("  Тикер: ")
    loop:
    for {
        select {
        case t := <-ticker.C:
            fmt.Printf("[%s] ", t.Format("15:04:05.000"))
        case <-done:
            break loop
        }
    }
    fmt.Println("\n")
}

// ╔══════════════════════════════════════════════════════════╗
// ║  8. ДЕДЛОК (для демонстрации — ЗАКОММЕНТИРОВАН)       ║
// ╚══════════════════════════════════════════════════════════╝

func deadlockDemo() {
    fmt.Println("── 8. ДЕДЛОК (пример ошибки) ──")
    fmt.Println("  Следующий код вызовет deadlock:")
    fmt.Println("    ch := make(chan int)")
    fmt.Println("    ch <- 1  // БЛОКИРУЕТСЯ навсегда — никто не читает")
    fmt.Println("    // fatal error: all goroutines are asleep - deadlock!")
    fmt.Println()
}

// ╔══════════════════════════════════════════════════════════╗
// ║  MAIN                                                    ║
// ╚══════════════════════════════════════════════════════════╝

func main() {
    fmt.Println("╔══════════════════════════════════════════╗")
    fmt.Println("║   КОНКУРЕНТНОСТЬ: ГОРУТИНЫ И КАНАЛЫ   ║")
    fmt.Println("╚══════════════════════════════════════════╝")

    demoGoroutines()
    demoChannels()
    demoDirectionalChannels()
    demoSelect()
    demoPatterns()
    demoWorkerPool()
    demoTimers()
    deadlockDemo()

    fmt.Println("✅ Демонстрация завершена!")
}

🧠 Визуализация: горутины, каналы и планировщик

┌─────────────────────────────────────────────────────────────┐
│                    RUNTIME GO                               │
│                                                             │
│  ┌──────────┐  ┌──────────┐  ┌──────────┐                 │
│  │ Gорутина1 │  │ Gорутина2 │  │ Gорутина3 │  ... (100K+)  │
│  │ (main)   │  │ (worker)  │  │ (worker)  │               │
│  └────┬─────┘  └────┬─────┘  └────┬─────┘                 │
│       │             │             │                         │
│       └──────────┬──┴─────────────┘                         │
│                  │                                          │
│       ┌──────────▼──────────┐                               │
│       │   GO SCHEDULER      │  M:N планировщик             │
│       │  (GOMAXPROCS=8)     │  M = горутины, N = потоки ОС │
│       └──────────┬──────────┘                               │
│                  │                                          │
│       ┌──────────▼──────────┐                               │
│       │   ПОТОКИ ОС (8)     │  Настоящие OS threads        │
│       └─────────────────────┘                               │
│                                                             │
│  КАНАЛЫ (chan):                                            │
│                                                             │
│  Gорутина1 ──chan──▶ Gорутина2                             │
│              (данные)                                       │
│                                                             │
│  Небуферизированный: отправитель ЖДЁТ получателя           │
│  Буферизированный: отправитель НЕ ЖДЁТ (пока есть место)   │
│                                                             │
│  SELECT {                                                   │
│    case msg := <-ch1:  // если ch1 готов                   │
│    case msg := <-ch2:  // если ch2 готов                   │
│    case <-timeout:     // если таймаут                     │
│    default:            // если никто не готов (не ждём)    │
│  }                                                          │
└─────────────────────────────────────────────────────────────┘

📊 Каналы: небуферизированные vs буферизированные

ХарактеристикаНебуферизированный make(chan T)Буферизированный make(chan T, N)
ОтправкаБлокируется до чтенияБлокируется только при заполнении
ПолучениеБлокируется до отправкиБлокируется только при пустоте
СинхронизацияГарантирует синхронизациюНе гарантирует
ИспользованиеСигналы, синхронизацияОчереди, worker pool

🚀 Запуск программы

# Запуск
go run main.go

# Сборка
go build -o concurrency-demo main.go
./concurrency-demo

# Детектор гонок (обнаружение data race)
go run -race main.go

# Просмотр количества горутин в процессе
# (добавьте runtime.NumGoroutine() в код)
go run main.go
💡 Практический совет:

⚠️ Типичные ошибки:

💡 Практический совет:

💡 Best practices от сеньоров:

🔑 Ключевые концепции

Что нужно запомнить из этого урока:

💡 Для Node.js разработчика:

← Предыдущий урок Следующий урок →