· 13 мин 👁 1.5k Начинающий

Язык Go — каналы

Каналы в Go — создание, буферизованные и небуферизованные, семафор через канал, паттерны управления горутинами и сигнал завершения.

каналыгорутинысинхронизациясемафорконкурентность
Содержание

Каналы — основной инструмент общения между горутинами в Go. Через них передаются данные и сигналы, через них горутины синхронизируются.

Создание канала

Канал создаётся через make — как map. Результат является ссылкой на внутреннюю структуру данных:

ci := make(chan int)           // небуферизованный канал целых чисел
cj := make(chan int, 0)        // то же самое: размер буфера 0 — по умолчанию
cs := make(chan *os.File, 100) // буферизованный канал: буфер на 100 указателей

Разница между буферизованным и небуферизованным каналом — в том, когда блокируется отправитель.

Небуферизованный канал: синхронизация через обмен

Небуферизованный канал объединяет два действия в одно: передачу значения и синхронизацию. Отправитель блокируется до тех пор, пока получатель не примет значение — и наоборот. В момент обмена обе стороны гарантированно находятся в известном состоянии.

Практический пример — дождаться завершения фоновой сортировки:

c := make(chan int) // небуферизованный канал — синхронизация без буфера

go func() {
    list.Sort()
    c <- 1 // сортировка завершена — отправляем сигнал (значение не важно)
}()

doSomethingForAWhile() // делаем что-то полезное пока идёт сортировка

<-c // блокируемся здесь до получения сигнала; само значение отбрасываем

Получатель всегда блокируется пока в канале нет данных. Отправитель в небуферизованном канале блокируется пока получатель не принял значение.

Буферизованный канал: отправитель не ждёт получателя

Если у канала есть буфер — отправитель блокируется только когда буфер заполнен. Пока есть место — запись проходит немедленно:

ch := make(chan int, 3) // буфер на 3 элемента

ch <- 1 // не блокируется — буфер пуст
ch <- 2 // не блокируется — в буфере 1 элемент
ch <- 3 // не блокируется — в буфере 2 элемента
ch <- 4 // блокируется — буфер заполнен, ждём получателя

fmt.Println(<-ch) // 1 — читаем, освобождаем место в буфере

Буферизованный канал как семафор

Семафор — классический инструмент для ограничения количества одновременно выполняемых операций. Буферизованный канал ведёт себя как семафор: его ёмкость задаёт максимум параллельных выполнений.

var sem = make(chan int, MaxOutstanding) // ёмкость = максимум параллельных обработчиков

func handle(r *Request) {
    sem <- 1    // захватываем слот: если канал заполнен — ждём освобождения
    process(r)  // выполняем запрос (может занять долгое время)
    <-sem       // освобождаем слот: следующий ожидающий может войти
}

func Serve(queue chan *Request) {
    for {
        req := <-queue
        go handle(req) // запускаем горутину для каждого запроса, не ждём завершения
    }
}

Пока MaxOutstanding обработчиков выполняют process, все остальные блокируются на sem <- 1 — ждут пока кто-то не освободит слот через <-sem.

У этого решения есть проблема: Serve создаёт новую горутину на каждый входящий запрос — даже если все слоты заняты. При высокой нагрузке горутины накапливаются, каждая висит на sem <- 1 и потребляет память.

Ограничение создания горутин через семафор

Исправление: переносим захват семафора до создания горутины — тогда новые горутины не создаются пока есть свободные слоты:

func Serve(queue chan *Request) {
    for req := range queue { // range по каналу: читаем запросы до закрытия очереди
        sem <- 1 // захватываем слот ДО запуска горутины — блокируемся если лимит исчерпан
        go func() {
            // shadowing: req := req нужен в Go < 1.22,
            // иначе все горутины захватят одну переменную req из цикла
            process(req)
            <-sem // освобождаем слот после завершения
        }()
    }
}

Теперь количество горутин не превышает MaxOutstanding — новые не создаются пока старые не завершатся.

Пул воркеров: фиксированное число горутин

Ещё лучший подход — запустить ровно MaxOutstanding горутин-воркеров, которые читают из общего канала запросов. Не нужен семафор, не нужно думать о лимитах — количество воркеров само по себе ограничивает параллелизм:

func handle(queue chan *Request) {
    for r := range queue { // каждый воркер читает из канала пока тот не закрыт
        process(r)
    }
}

func Serve(clientRequests chan *Request, quit chan bool) {
    // запускаем ровно MaxOutstanding воркеров — не больше и не меньше
    for i := 0; i < MaxOutstanding; i++ {
        go handle(clientRequests) // все воркеры читают из одного канала
    }
    <-quit // блокируемся до сигнала завершения
}

Использование:

requests := make(chan *Request, 100) // буферизованная очередь запросов
quit := make(chan bool)

go Serve(requests, quit)

// отправляем запросы
requests <- &Request{...}

// сигнал завершения
quit <- true

Три преимущества паттерна пула воркеров перед семафором:

  • количество горутин фиксировано с самого старта — никакого роста при нагрузке
  • каждый воркер живёт всё время работы сервера — нет накладных расходов на создание и уничтожение горутин
  • логика проще: нет захвата и освобождения семафора

Направленность канала

Канал можно ограничить до «только для записи» или «только для чтения» — это помогает компилятору поймать ошибки:

func producer(out chan<- int) { // chan<- : только запись
    out <- 42
    // <-out  // ошибка компиляции: нельзя читать из chan<-
}

func consumer(in <-chan int) { // <-chan : только чтение
    val := <-in
    // in <- 1  // ошибка компиляции: нельзя писать в <-chan
    fmt.Println(val)
}

func main() {
    ch := make(chan int) // двунаправленный канал
    go producer(ch)     // неявно конвертируется в chan<-
    consumer(ch)        // неявно конвертируется в <-chan
}

В сигнатурах функций всегда указывайте направленность — это документация и защита от ошибок одновременно.

Закрытие канала как сигнал

Закрытый канал сразу возвращает нулевое значение при чтении. Это используется как широковещательный сигнал — одно закрытие разблокирует сразу всех получателей:

done := make(chan struct{}) // struct{} — нулевой размер, сигнал без данных

// запускаем несколько воркеров
for i := 0; i < 3; i++ {
    go func(id int) {
        select {
        case <-done: // все три воркера разблокируются одновременно при закрытии
            fmt.Printf("воркер %d завершился\n", id)
        }
    }(i)
}

time.Sleep(time.Second)
close(done) // один вызов — все воркеры получают сигнал
time.Sleep(100 * time.Millisecond)
// воркер 0 завершился
// воркер 2 завершился
// воркер 1 завершился

Отправка значения в канал разблокирует только одного получателя. Закрытие канала разблокирует всех.

Итоги

  • make(chan T) — небуферизованный канал; make(chan T, n) — буфер на n элементов
  • Небуферизованный канал синхронизирует: отправитель ждёт получателя, получатель ждёт отправителя
  • Буферизованный канал блокирует отправителя только при заполненном буфере
  • Буферизованный канал как семафор: ёмкость = максимум параллельных операций
  • Захватывайте семафор до запуска горутины, иначе горутины накапливаются при нагрузке
  • Пул воркеров — чище семафора: фиксированное число горутин читает из общего канала
  • Направленность chan<- и <-chan — указывайте в сигнатурах функций
  • Закрытие канала разблокирует всех получателей сразу — используйте как широковещательный сигнал