Как построить конвейер (pipeline) на горутинах и каналах?

Ответ

Конвейер (pipeline) в Go — это последовательность этапов обработки данных, соединенных каналами. Каждый этап выполняется в отдельной горутине. Этот паттерн отлично подходит для потоковой обработки данных.

Основные принципы построения:

  1. Этап (Stage): Функция, которая запускается как горутина. Она принимает данные из входного канала, обрабатывает их и отправляет результат в выходной канал.
  2. Каналы (Channels): Соединяют этапы, передавая данные от одного к другому.

Пример: конвейер для генерации чисел и возведения их в квадрат.

// 1. Этап генерации чисел
func generator(nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        for _, n := range nums {
            out <- n
        }
        close(out) // Важно закрыть канал, чтобы сообщить о завершении
    }()
    return out
}

// 2. Этап возведения в квадрат
func square(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        for n := range in { // Цикл завершится, когда канал 'in' закроется
            out <- n * n
        }
        close(out)
    }()
    return out
}

func main() {
    // Собираем конвейер
    in := generator(2, 3, 4)
    out := square(in)

    // Читаем результат
    for result := range out {
        fmt.Println(result) // Выведет 4, 9, 16
    }
}

Ключевые моменты и лучшие практики:

  • Закрытие каналов: Каждый этап-отправитель должен закрывать свой выходной канал (close(out)), чтобы сигнализировать следующему этапу о том, что данных больше не будет. Это позволяет for range по каналу корректно завершиться.
  • Направленные каналы: Использование направленных каналов (<-chan для чтения, chan<- для записи) делает код безопаснее и понятнее, предотвращая случайную запись в канал, предназначенный только для чтения, и наоборот.
  • Fan-out / Fan-in: Для распараллеливания одного из этапов конвейера можно использовать паттерн Fan-out (один канал на вход, несколько горутин-обработчиков) и Fan-in (объединение результатов от нескольких горутин в один канал).