Язык Go — каналы
Каналы в Go — создание, буферизованные и небуферизованные, семафор через канал, паттерны управления горутинами и сигнал завершения.
Содержание
Каналы — основной инструмент общения между горутинами в Go. Через них передаются данные и сигналы, через них горутины синхронизируются.
Создание канала
Канал создаётся через make — как map. Результат является ссылкой на внутреннюю структуру данных:
ci := make(chan int) // небуферизованный канал целых чисел
cj := make(chan int, 0) // то же самое: размер буфера 0 — по умолчанию
cs := make(chan *os.File, 100) // буферизованный канал: буфер на 100 указателей
Разница между буферизованным и небуферизованным каналом — в том, когда блокируется отправитель.
Небуферизованный канал: синхронизация через обмен
Небуферизованный канал объединяет два действия в одно: передачу значения и синхронизацию. Отправитель блокируется до тех пор, пока получатель не примет значение — и наоборот. В момент обмена обе стороны гарантированно находятся в известном состоянии.
Практический пример — дождаться завершения фоновой сортировки:
c := make(chan int) // небуферизованный канал — синхронизация без буфера
go func() {
list.Sort()
c <- 1 // сортировка завершена — отправляем сигнал (значение не важно)
}()
doSomethingForAWhile() // делаем что-то полезное пока идёт сортировка
<-c // блокируемся здесь до получения сигнала; само значение отбрасываем
Получатель всегда блокируется пока в канале нет данных. Отправитель в небуферизованном канале блокируется пока получатель не принял значение.
Буферизованный канал: отправитель не ждёт получателя
Если у канала есть буфер — отправитель блокируется только когда буфер заполнен. Пока есть место — запись проходит немедленно:
ch := make(chan int, 3) // буфер на 3 элемента
ch <- 1 // не блокируется — буфер пуст
ch <- 2 // не блокируется — в буфере 1 элемент
ch <- 3 // не блокируется — в буфере 2 элемента
ch <- 4 // блокируется — буфер заполнен, ждём получателя
fmt.Println(<-ch) // 1 — читаем, освобождаем место в буфере
Буферизованный канал как семафор
Семафор — классический инструмент для ограничения количества одновременно выполняемых операций. Буферизованный канал ведёт себя как семафор: его ёмкость задаёт максимум параллельных выполнений.
var sem = make(chan int, MaxOutstanding) // ёмкость = максимум параллельных обработчиков
func handle(r *Request) {
sem <- 1 // захватываем слот: если канал заполнен — ждём освобождения
process(r) // выполняем запрос (может занять долгое время)
<-sem // освобождаем слот: следующий ожидающий может войти
}
func Serve(queue chan *Request) {
for {
req := <-queue
go handle(req) // запускаем горутину для каждого запроса, не ждём завершения
}
}
Пока MaxOutstanding обработчиков выполняют process, все остальные блокируются на sem <- 1 — ждут пока кто-то не освободит слот через <-sem.
У этого решения есть проблема: Serve создаёт новую горутину на каждый входящий запрос — даже если все слоты заняты. При высокой нагрузке горутины накапливаются, каждая висит на sem <- 1 и потребляет память.
Ограничение создания горутин через семафор
Исправление: переносим захват семафора до создания горутины — тогда новые горутины не создаются пока есть свободные слоты:
func Serve(queue chan *Request) {
for req := range queue { // range по каналу: читаем запросы до закрытия очереди
sem <- 1 // захватываем слот ДО запуска горутины — блокируемся если лимит исчерпан
go func() {
// shadowing: req := req нужен в Go < 1.22,
// иначе все горутины захватят одну переменную req из цикла
process(req)
<-sem // освобождаем слот после завершения
}()
}
}
Теперь количество горутин не превышает MaxOutstanding — новые не создаются пока старые не завершатся.
Пул воркеров: фиксированное число горутин
Ещё лучший подход — запустить ровно MaxOutstanding горутин-воркеров, которые читают из общего канала запросов. Не нужен семафор, не нужно думать о лимитах — количество воркеров само по себе ограничивает параллелизм:
func handle(queue chan *Request) {
for r := range queue { // каждый воркер читает из канала пока тот не закрыт
process(r)
}
}
func Serve(clientRequests chan *Request, quit chan bool) {
// запускаем ровно MaxOutstanding воркеров — не больше и не меньше
for i := 0; i < MaxOutstanding; i++ {
go handle(clientRequests) // все воркеры читают из одного канала
}
<-quit // блокируемся до сигнала завершения
}
Использование:
requests := make(chan *Request, 100) // буферизованная очередь запросов
quit := make(chan bool)
go Serve(requests, quit)
// отправляем запросы
requests <- &Request{...}
// сигнал завершения
quit <- true
Три преимущества паттерна пула воркеров перед семафором:
- количество горутин фиксировано с самого старта — никакого роста при нагрузке
- каждый воркер живёт всё время работы сервера — нет накладных расходов на создание и уничтожение горутин
- логика проще: нет захвата и освобождения семафора
Направленность канала
Канал можно ограничить до «только для записи» или «только для чтения» — это помогает компилятору поймать ошибки:
func producer(out chan<- int) { // chan<- : только запись
out <- 42
// <-out // ошибка компиляции: нельзя читать из chan<-
}
func consumer(in <-chan int) { // <-chan : только чтение
val := <-in
// in <- 1 // ошибка компиляции: нельзя писать в <-chan
fmt.Println(val)
}
func main() {
ch := make(chan int) // двунаправленный канал
go producer(ch) // неявно конвертируется в chan<-
consumer(ch) // неявно конвертируется в <-chan
}
В сигнатурах функций всегда указывайте направленность — это документация и защита от ошибок одновременно.
Закрытие канала как сигнал
Закрытый канал сразу возвращает нулевое значение при чтении. Это используется как широковещательный сигнал — одно закрытие разблокирует сразу всех получателей:
done := make(chan struct{}) // struct{} — нулевой размер, сигнал без данных
// запускаем несколько воркеров
for i := 0; i < 3; i++ {
go func(id int) {
select {
case <-done: // все три воркера разблокируются одновременно при закрытии
fmt.Printf("воркер %d завершился\n", id)
}
}(i)
}
time.Sleep(time.Second)
close(done) // один вызов — все воркеры получают сигнал
time.Sleep(100 * time.Millisecond)
// воркер 0 завершился
// воркер 2 завершился
// воркер 1 завершился
Отправка значения в канал разблокирует только одного получателя. Закрытие канала разблокирует всех.
Итоги
make(chan T)— небуферизованный канал;make(chan T, n)— буфер наnэлементов- Небуферизованный канал синхронизирует: отправитель ждёт получателя, получатель ждёт отправителя
- Буферизованный канал блокирует отправителя только при заполненном буфере
- Буферизованный канал как семафор: ёмкость = максимум параллельных операций
- Захватывайте семафор до запуска горутины, иначе горутины накапливаются при нагрузке
- Пул воркеров — чище семафора: фиксированное число горутин читает из общего канала
- Направленность
chan<-и<-chan— указывайте в сигнатурах функций - Закрытие канала разблокирует всех получателей сразу — используйте как широковещательный сигнал