· 10 мин 👁 1.7k Средний

Язык Go — каналы каналов

Канал как first-class value в Go — передача канала внутри структуры запроса, паттерн ответного канала и основа для RPC без единого мьютекса.

каналыгорутиныRPCдемультиплексированиеконкурентность
Содержание

Канал в Go — first-class value. Это означает что канал можно передавать в функцию, хранить в структуре и даже отправлять через другой канал. На этом свойстве строится один из самых элегантных паттернов конкурентного Go.

Канал как обычное значение

В большинстве языков примитивы синхронизации — особые объекты с особым обращением. В Go канал ничем не отличается от int или указателя с точки зрения передачи:

// канал можно передать в функцию
func readFrom(ch <-chan int) { ... }

// канал можно положить в структуру
type Worker struct {
    jobs chan Job
    done chan struct{}
}

// канал можно отправить через другой канал
dispatcher := make(chan chan int) // канал каналов

Именно это позволяет клиенту и серверу общаться без общей памяти.

Паттерн ответного канала

Классическая задача: клиент отправляет запрос серверу и хочет получить ответ. Без каналов — нужен мьютекс, общий map или какой-то реестр ответов. С каналами — клиент просто кладёт в запрос свой канал для ответа:

// Request содержит всё необходимое для выполнения и возврата результата:
// — аргументы
// — функцию которую нужно применить
// — канал куда сервер отправит ответ
type Request struct {
    args       []int
    f          func([]int) int
    resultChan chan int // ответный канал: принадлежит клиенту
}

Каждый клиент создаёт свой resultChan — ответы не перемешаются.

Клиентская сторона

// вспомогательная функция для демонстрации
func sum(a []int) (s int) {
    for _, v := range a {
        s += v
    }
    return
}

// клиент формирует запрос и кладёт в него свой канал для ответа
request := &Request{
    args:       []int{3, 4, 5},
    f:          sum,
    resultChan: make(chan int), // создаём канал специально под этот запрос
}

clientRequests <- request // отправляем запрос серверу

// блокируемся именно на своём канале — чужие ответы не придут
fmt.Printf("answer: %d\n", <-request.resultChan) // answer: 12

Два клиента работают независимо — у каждого свой resultChan, ответы не пересекаются:

req1 := &Request{[]int{1, 2, 3}, sum, make(chan int)}
req2 := &Request{[]int{10, 20, 30}, sum, make(chan int)}

clientRequests <- req1
clientRequests <- req2

// каждый ждёт ответа на своём канале — порядок получения не важен
fmt.Println(<-req1.resultChan) // 6
fmt.Println(<-req2.resultChan) // 60

Серверная сторона

Сервер не знает сколько клиентов и не хранит никакого состояния — просто выполняет функцию и отправляет результат в канал из запроса:

func handle(queue chan *Request) {
    for req := range queue {            // читаем запросы пока канал не закрыт
        req.resultChan <- req.f(req.args) // вычисляем и отправляем ответ клиенту
    }
}

Запускаем пул воркеров из предыдущей статьи:

const MaxOutstanding = 10

func Serve(clientRequests chan *Request, quit chan struct{}) {
    for i := 0; i < MaxOutstanding; i++ {
        go handle(clientRequests) // каждый воркер читает из общей очереди
    }
    <-quit // ждём сигнала завершения
}

Почему это работает без мьютексов

Разберём по шагам что происходит с данными:

клиент 1 ──► clientRequests ──► воркер A ──► req1.resultChan ──► клиент 1
клиент 2 ──► clientRequests ──► воркер B ──► req2.resultChan ──► клиент 2
клиент 3 ──► clientRequests ──► воркер A ──► req3.resultChan ──► клиент 3

Каждое значение в каждый момент принадлежит ровно одной горутине:

  • запрос лежит в clientRequests — никто его не трогает пока воркер не заберёт
  • воркер взял запрос — только он имеет к нему доступ
  • воркер отправил результат в resultChan — дальше данные принадлежат клиенту

Нечего защищать мьютексом — нет разделяемого состояния.

Расширение: таймаут на ответ

Ответный канал легко комбинируется с select для ограничения времени ожидания:

request := &Request{
    args:       []int{3, 4, 5},
    f:          sum,
    resultChan: make(chan int, 1), // буфер 1: сервер не заблокируется если клиент ушёл по таймауту
}

clientRequests <- request

select {
case result := <-request.resultChan:
    fmt.Println("получили:", result)
case <-time.After(5 * time.Second):
    fmt.Println("сервер не ответил вовремя")
}

Буфер размером 1 в resultChan — важная деталь: если клиент уйдёт по таймауту, сервер всё равно сможет отправить результат и не заблокируется навсегда.

Итоги

  • Канал — first-class value: передаётся в функции, хранится в структурах, отправляется через другие каналы
  • Паттерн ответного канала: клиент кладёт в запрос свой chan — сервер отправляет ответ именно туда
  • Каждый клиент создаёт свой канал — ответы не перемешиваются, мьютексы не нужны
  • Данные в каждый момент принадлежат ровно одной горутине — это и есть «общайся через передачу данных»
  • При таймауте делайте буфер 1 в ответном канале — иначе сервер заблокируется на отправке