async/await + Promise.all → горутины + каналы
Event loop (однопоточный) → M:N планировщик (много горутин на потоки ОС)
EventEmitter → chan (типизированный, с backpressure)
setTimeout → time.After
setInterval → time.NewTicker
AbortController → context.Context (урок 9)
go, легковесность (2-4 КБ стека)chan<- (только запись), <-chan (только чтение)select — мультиплексирование каналовtime.After, time.NewTickermkdir go-concurrency && cd go-concurrency
go mod init go-concurrency
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
// ╔══════════════════════════════════════════════════════════╗
// ║ 1. ГОРУТИНЫ — ЗАПУСК ║
// ╚══════════════════════════════════════════════════════════╝
func demoGoroutines() {
fmt.Println("── 1. ГОРУТИНЫ ──")
// Горутина запускается ключевым словом go.
// Это НЕ поток ОС — это легковесный поток внутри рантайма Go.
// Стек начинается с ~2 КБ и растёт/сжимается динамически.
// Запуск горутины
go func() {
fmt.Println(" Привет из горутины!")
}()
// Главная горутина (main) тоже горутина.
// Если main завершится, ВСЕ горутины будут убиты.
// Поэтому даём время на выполнение:
time.Sleep(50 * time.Millisecond)
// Демонстрация параллельного выполнения
fmt.Println(" Запуск 5 горутин:")
for i := 1; i <= 5; i++ {
// ВАЖНО: передаём i как аргумент!
// Если использовать i напрямую — замыкание захватит
// ПЕРЕМЕННУЮ, а не значение. Все горутины увидят одно и то же i.
go func(id int) {
fmt.Printf(" Горутина %d\n", id)
}(i)
}
time.Sleep(50 * time.Millisecond)
fmt.Println()
}
// ╔══════════════════════════════════════════════════════════╗
// ║ 2. КАНАЛЫ — СОЗДАНИЕ И БАЗОВЫЕ ОПЕРАЦИИ ║
// ╚══════════════════════════════════════════════════════════╝
func demoChannels() {
fmt.Println("── 2. КАНАЛЫ ──")
// Канал — типизированная очередь для передачи данных между горутинами.
// Аналог: потокобезопасная очередь + EventEmitter + backpressure.
// Небуферизированный канал: отправитель БЛОКИРУЕТСЯ,
// пока получатель не прочитает.
unbuffered := make(chan string)
go func() {
time.Sleep(100 * time.Millisecond)
unbuffered <- "данные" // Блокируется, пока main не прочитает
fmt.Println(" Отправитель: данные отправлены")
}()
msg := <-unbuffered // Блокируется, пока горутина не отправит
fmt.Printf(" Получатель: получил %q\n", msg)
// Буферизированный канал: отправитель НЕ блокируется,
// пока буфер не заполнен.
buffered := make(chan int, 3) // Буфер на 3 элемента
buffered <- 1 // Не блокируется
buffered <- 2 // Не блокируется
buffered <- 3 // Не блокируется
// buffered <- 4 // ЗАБЛОКИРУЕТСЯ — буфер полон!
fmt.Printf(" Буферизированный канал: %d %d %d\n",
<-buffered, <-buffered, <-buffered)
// Закрытие канала: close(ch)
// После закрытия:
// - Чтение возвращает zero value + false
// - Запись в закрытый канал → ПАНИКА
// - Чтение из закрытого пустого канала → сразу zero value
ch := make(chan int, 2)
ch <- 10
ch <- 20
close(ch)
// Чтение из закрытого канала
val, ok := <-ch
fmt.Printf(" Закрытый канал: val=%d ok=%v\n", val, ok) // 10 true
val, ok = <-ch
fmt.Printf(" Закрытый канал: val=%d ok=%v\n", val, ok) // 20 true
val, ok = <-ch
fmt.Printf(" Закрытый канал: val=%d ok=%v\n", val, ok) // 0 false
// Range по каналу — читает до закрытия
ch2 := make(chan int, 3)
go func() {
for i := 1; i <= 3; i++ {
ch2 <- i
}
close(ch2) // Обязательно закрыть, иначе range будет висеть вечно!
}()
fmt.Print(" Range по каналу: ")
for val := range ch2 {
fmt.Printf("%d ", val)
}
fmt.Println("\n")
}
// ╔══════════════════════════════════════════════════════════╗
// ║ 3. НАПРАВЛЕННЫЕ КАНАЛЫ ║
// ╚══════════════════════════════════════════════════════════╝
// producer — может ТОЛЬКО отправлять в канал (chan<-)
func producer(out chan<- int, n int) {
for i := 1; i <= n; i++ {
out <- i
}
close(out) // Закрывает канал, сигнализируя о завершении
}
// consumer — может ТОЛЬКО читать из канала (<-chan)
func consumer(in <-chan int, done chan<- bool) {
sum := 0
for val := range in {
sum += val
}
fmt.Printf(" Сумма: %d\n", sum)
done <- true
}
func demoDirectionalChannels() {
fmt.Println("── 3. НАПРАВЛЕННЫЕ КАНАЛЫ ──")
ch := make(chan int, 5)
done := make(chan bool)
// Направленные каналы — защита от ошибок на этапе компиляции
go producer(ch, 10) // ch преобразуется в chan<- int
go consumer(ch, done) // ch преобразуется в <-chan int
<-done
fmt.Println()
}
// ╔══════════════════════════════════════════════════════════╗
// ║ 4. SELECT — МУЛЬТИПЛЕКСИРОВАНИЕ ║
// ╚══════════════════════════════════════════════════════════╝
func demoSelect() {
fmt.Println("── 4. SELECT ──")
// select — как switch, но для каналов.
// Выполняет ТУ ветку, чей канал готов первым.
// Если готовы несколько — выбирает СЛУЧАЙНЫЙ.
ch1 := make(chan string)
ch2 := make(chan string)
// Горутина 1: отправляет с задержкой
go func() {
time.Sleep(100 * time.Millisecond)
ch1 <- "из канала 1 (медленный)"
}()
// Горутина 2: отправляет с меньшей задержкой
go func() {
time.Sleep(50 * time.Millisecond)
ch2 <- "из канала 2 (быстрый)"
}()
// select ждёт первый готовый канал
for i := 0; i < 2; i++ {
select {
case msg := <-ch1:
fmt.Printf(" Получено: %s\n", msg)
case msg := <-ch2:
fmt.Printf(" Получено: %s\n", msg)
case <-time.After(200 * time.Millisecond):
// time.After — канал, который закроется через указанное время
fmt.Println(" Таймаут!")
}
}
// default в select — неблокирующая операция
ch := make(chan int)
select {
case val := <-ch:
fmt.Printf(" Получено: %d\n", val)
default:
fmt.Println(" Канал пуст (default)")
}
fmt.Println()
}
// ╔══════════════════════════════════════════════════════════╗
// ║ 5. ПАТТЕРНЫ: FAN-OUT, FAN-IN, PIPELINE ║
// ╚══════════════════════════════════════════════════════════╝
// generator — генерирует числа в канал (источник)
func generator(nums ...int) <-chan int {
out := make(chan int)
go func() {
for _, n := range nums {
out <- n
}
close(out)
}()
return out
}
// square — возводит в квадрат (этап пайплайна)
func square(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * n
}
close(out)
}()
return out
}
// merge — fan-in: объединяет несколько каналов в один
func merge(channels ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)
// Запускаем горутину для каждого входного канала
output := func(c <-chan int) {
defer wg.Done()
for n := range c {
out <- n
}
}
wg.Add(len(channels))
for _, c := range channels {
go output(c)
}
// Закрываем out, когда все input-горутины завершены
go func() {
wg.Wait()
close(out)
}()
return out
}
func demoPatterns() {
fmt.Println("── 5. ПАТТЕРНЫ: PIPELINE, FAN-OUT, FAN-IN ──")
// Pipeline: generator → square → вывод
fmt.Print(" Pipeline (gen → square): ")
for n := range square(generator(1, 2, 3, 4, 5)) {
fmt.Printf("%d ", n)
}
fmt.Println()
// Fan-out + Fan-in
// Распределяем работу по двум square, затем объединяем
in := generator(2, 4, 6, 8, 10)
// Fan-out: два обработчика читают из одного канала
sq1 := square(in)
sq2 := square(in) // Оба читают из in (разные значения — round-robin)
// Fan-in: объединяем результаты
fmt.Print(" Fan-out/in (2 workers): ")
for n := range merge(sq1, sq2) {
fmt.Printf("%d ", n)
}
fmt.Println("\n")
}
// ╔══════════════════════════════════════════════════════════╗
// ║ 6. WORKER POOL (пул воркеров) ║
// ╚══════════════════════════════════════════════════════════╝
func worker(id int, jobs <-chan int, results chan<- string) {
for job := range jobs {
// Имитация работы
workTime := time.Duration(rand.Intn(300)) * time.Millisecond
time.Sleep(workTime)
results <- fmt.Sprintf("Worker %d обработал задачу %d за %v", id, job, workTime)
}
}
func demoWorkerPool() {
fmt.Println("── 6. WORKER POOL ──")
const numJobs = 10
const numWorkers = 3
jobs := make(chan int, numJobs)
results := make(chan string, numJobs)
// Запуск воркеров
for w := 1; w <= numWorkers; w++ {
go worker(w, jobs, results)
}
// Отправка задач
for j := 1; j <= numJobs; j++ {
jobs <- j
}
close(jobs) // Закрываем канал задач → воркеры завершатся
// Сбор результатов
for r := 1; r <= numJobs; r++ {
fmt.Println(" ", <-results)
}
fmt.Println()
}
// ╔══════════════════════════════════════════════════════════╗
// ║ 7. ТАЙМЕРЫ И ТИКЕРЫ ║
// ╚══════════════════════════════════════════════════════════╝
func demoTimers() {
fmt.Println("── 7. ТАЙМЕРЫ И ТИКЕРЫ ──")
// time.After — канал, который вернёт значение через заданное время
// Аналог setTimeout
fmt.Print(" Ожидание 100ms...")
<-time.After(100 * time.Millisecond)
fmt.Println(" готово!")
// time.NewTimer — таймер, который можно остановить
timer := time.NewTimer(200 * time.Millisecond)
go func() {
<-timer.C
fmt.Println(" Таймер сработал")
}()
// Передумали — останавливаем
if timer.Stop() {
fmt.Println(" Таймер остановлен до срабатывания")
}
// time.NewTicker — периодические события (аналог setInterval)
ticker := time.NewTicker(50 * time.Millisecond)
defer ticker.Stop()
done := make(chan bool)
go func() {
time.Sleep(200 * time.Millisecond)
done <- true
}()
fmt.Print(" Тикер: ")
loop:
for {
select {
case t := <-ticker.C:
fmt.Printf("[%s] ", t.Format("15:04:05.000"))
case <-done:
break loop
}
}
fmt.Println("\n")
}
// ╔══════════════════════════════════════════════════════════╗
// ║ 8. ДЕДЛОК (для демонстрации — ЗАКОММЕНТИРОВАН) ║
// ╚══════════════════════════════════════════════════════════╝
func deadlockDemo() {
fmt.Println("── 8. ДЕДЛОК (пример ошибки) ──")
fmt.Println(" Следующий код вызовет deadlock:")
fmt.Println(" ch := make(chan int)")
fmt.Println(" ch <- 1 // БЛОКИРУЕТСЯ навсегда — никто не читает")
fmt.Println(" // fatal error: all goroutines are asleep - deadlock!")
fmt.Println()
}
// ╔══════════════════════════════════════════════════════════╗
// ║ MAIN ║
// ╚══════════════════════════════════════════════════════════╝
func main() {
fmt.Println("╔══════════════════════════════════════════╗")
fmt.Println("║ КОНКУРЕНТНОСТЬ: ГОРУТИНЫ И КАНАЛЫ ║")
fmt.Println("╚══════════════════════════════════════════╝")
demoGoroutines()
demoChannels()
demoDirectionalChannels()
demoSelect()
demoPatterns()
demoWorkerPool()
demoTimers()
deadlockDemo()
fmt.Println("✅ Демонстрация завершена!")
}
┌─────────────────────────────────────────────────────────────┐
│ RUNTIME GO │
│ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ Gорутина1 │ │ Gорутина2 │ │ Gорутина3 │ ... (100K+) │
│ │ (main) │ │ (worker) │ │ (worker) │ │
│ └────┬─────┘ └────┬─────┘ └────┬─────┘ │
│ │ │ │ │
│ └──────────┬──┴─────────────┘ │
│ │ │
│ ┌──────────▼──────────┐ │
│ │ GO SCHEDULER │ M:N планировщик │
│ │ (GOMAXPROCS=8) │ M = горутины, N = потоки ОС │
│ └──────────┬──────────┘ │
│ │ │
│ ┌──────────▼──────────┐ │
│ │ ПОТОКИ ОС (8) │ Настоящие OS threads │
│ └─────────────────────┘ │
│ │
│ КАНАЛЫ (chan): │
│ │
│ Gорутина1 ──chan──▶ Gорутина2 │
│ (данные) │
│ │
│ Небуферизированный: отправитель ЖДЁТ получателя │
│ Буферизированный: отправитель НЕ ЖДЁТ (пока есть место) │
│ │
│ SELECT { │
│ case msg := <-ch1: // если ch1 готов │
│ case msg := <-ch2: // если ch2 готов │
│ case <-timeout: // если таймаут │
│ default: // если никто не готов (не ждём) │
│ } │
└─────────────────────────────────────────────────────────────┘
| Характеристика | Небуферизированный make(chan T) | Буферизированный make(chan T, N) |
|---|---|---|
| Отправка | Блокируется до чтения | Блокируется только при заполнении |
| Получение | Блокируется до отправки | Блокируется только при пустоте |
| Синхронизация | Гарантирует синхронизацию | Не гарантирует |
| Использование | Сигналы, синхронизация | Очереди, worker pool |
# Запуск
go run main.go
# Сборка
go build -o concurrency-demo main.go
./concurrency-demo
# Детектор гонок (обнаружение data race)
go run -race main.go
# Просмотр количества горутин в процессе
# (добавьте runtime.NumGoroutine() в код)
go run main.go
⚠️ Типичные ошибки:
go func() { use(i) }() — все горутины видят ПОСЛЕДНЕЕ значение i. Передавай как аргумент!💡 Best practices от сеньоров:
func(in <-chan) явно говорит: “я только читаю”.💡 Для Node.js разработчика:
select — как Promise.race, но для каналов, с default (неблокирующий).Promise.all — используй sync.WaitGroup или каналы с подсчётом.