Pipelining with Go
A few days ago, a friend of mine posted a question about the Go concurrency control. The question was, "Can we assure that doSomething(0)
always runs before doSomething(2)
?" The answer would be, "No." We cannot determine the order in which goroutine can run at a preemption point as in other languages.
var mtx = [2]sync.Mutex{}
func foo() {
for i := 0; i < 4; i++ {
go doSomething(i)
}
}
func doSomething(i int) {
mtx[i%2].Lock()
// do something
mtx[i%2].Unlock()
}
Though using a mutex for concurrency management is a norm in other languages (e.g., in C++), it is not in Go. Mutex is a mechanism to solve concurrency issues by synchronizing memory access. The Go language FAQ says that "Do not communicate by sharing memory. Instead, share memory by communicating."
If the issue is really we have to maintain the running order of data for every other item, we can achieve this by using the WaitGroup
of the sync
package.
var (
wg sync.WaitGroup
)
// For a simplicity, I haven't handled the corner cases:
// It works only when the number of inputs is a multiple of `phase`.
func foo(input []int, phase int) {
for i := 0; i < len(input); i += phase {
wg.Add(phase)
for j := i; j < i + phase; j++ {
go doSomething(j)
}
wg.Wait()
}
}
func doSomething(i int) {
defer wg.Done()
// do something
fmt.Printf("I'm %d\n", i)
}
However, the issue seemed to handle an infinite input stream while maintaining input dependencies (it can be order or anything else). And, we could guess that it was a data streaming issue, which is the functional programming's turf. I agree that Go lacks functional programming features. However, we can achieve it with the core of the Go language: goroutines and channels.
In "Concurrency in Go: Tools and Techniques for Developers, Katherine Cox-Buday, O'Reilly Media," the author is saying as follows:
Go’s philosophy on concurrency can be summed up like this: aim for simplicity, use channels when possible, and treat goroutines like a free resource.
So, we can think it as a problem to make a pipeline. Let's divide the problem as followings:
- A Generator to stream the input.
- Demux the input to multiple streams according to the dependency.
- Perform doSomething parallelly.
- Mux the results into a stream.
Input Stream Generator
If you're used to a functional programming language (such as Python..), you might miss the generator (me too!) in Go. But you can create a generator with a channel.
func generator(done <-chan interface{}, input []int) <-chan int {
var (
stream = make(chan int)
)
go func() {
defer close(stream)
for _, v := range input {
select {
case <-done:
return
case stream <- v:
}
}
}()
return stream
}
This phase might be redundant, but it is a basic pattern to use goroutines and channels for the streaming.
Demuxing
Then, we split the input stream into multiple streams (channels). We're merely splitting it to the given number of streams, but you can bring your dependency here.
func demux(done <-chan interface{},
numOut int, inStream <-chan int) []chan int {
var (
streams = make([]chan int, numOut)
)
// Do not forget to initialize each channel.
for i := 0; i < numOut; i++ {
streams[i] = make(chan int)
}
go func() {
defer func() {
for _, st := range streams {
close(st)
}
}()
for i:= 0;;{
select {
case <-done:
return
case val, ok := <-inStream:
if !ok {
return
}
streams[i % numOut] <- val
i++
if i == numOut {
i = 0
}
}
}
}()
return streams
}
Parallel Execution
We're going to execute each stream in a goroutine. The parallel execution does not guarantee the running order across the channels (though we can maintain the order in a demuxed channel). So, we're going to define a result
struct to contain the result information.
type result struct {
channel int
input int
value int
}
func execute(done <-chan interface{},
inputChans []chan int, fn func(int) int) []chan result {
var (
numInputChans = len(inputChans)
streams = make([]chan result, numInputChans)
)
for i := 0; i < numInputChans; i++ {
streams[i] = make(chan result)
}
var wg sync.WaitGroup
do := func(chanID int, input <-chan int) {
defer wg.Done()
for {
select {
case <-done:
return
case val, ok := <-input:
if !ok {
return
}
streams[chanID] <- result{channel: chanID, input: val, value: fn(val)}
}
}
}
wg.Add(numInputChans)
for i, c := range inputChans {
go do(i, c)
}
go func() {
wg.Wait()
for _, s := range streams {
close(s)
}
}()
return streams
}
Muxing
Finally, we multiplex the multiple results into a stream.
func mux(done <-chan interface{},
inputChans []chan result) <-chan result {
var (
numInputChans = len(inputChans)
stream = make(chan result)
)
var wg sync.WaitGroup
multiplex := func(c <-chan result) {
defer wg.Done()
for v := range c {
select {
case <-done:
return
case stream <- v:
}
}
}
wg.Add(numInputChans)
for _, c := range inputChans {
go multiplex(c)
}
go func() {
wg.Wait()
close(stream)
}()
return stream
}
Final Result
The final code and a test running result will be as follows.
package main
import (
"fmt"
"sync"
)
func generator(done <-chan interface{}, input []int) <-chan int {
var (
stream = make(chan int)
)
go func() {
defer close(stream)
for _, v := range input {
select {
case <-done:
return
case stream <- v:
}
}
}()
return stream
}
func demux(done <-chan interface{},
numOut int, inStream <-chan int) []chan int {
var (
streams = make([]chan int, numOut)
)
// Do not forget to initialize each channel.
for i := 0; i < numOut; i++ {
streams[i] = make(chan int)
}
go func() {
defer func() {
for _, st := range streams {
close(st)
}
}()
for i:= 0;;{
select {
case <-done:
return
case val, ok := <-inStream:
if !ok {
return
}
streams[i % numOut] <- val
i++
if i == numOut {
i = 0
}
}
}
}()
return streams
}
type result struct {
channel int
input int
value int
}
func execute(done <-chan interface{},
inputChans []chan int, fn func(int) int) []chan result {
var (
numInputChans = len(inputChans)
streams = make([]chan result, numInputChans)
)
for i := 0; i < numInputChans; i++ {
streams[i] = make(chan result)
}
var wg sync.WaitGroup
do := func(chanID int, input <-chan int) {
defer wg.Done()
for {
select {
case <-done:
return
case val, ok := <-input:
if !ok {
return
}
streams[chanID] <- result{channel: chanID, input: val, value: fn(val)}
}
}
}
wg.Add(numInputChans)
for i, c := range inputChans {
go do(i, c)
}
go func() {
wg.Wait()
for _, s := range streams {
close(s)
}
}()
return streams
}
func mux(done <-chan interface{},
inputChans []chan result) <-chan result {
var (
numInputChans = len(inputChans)
stream = make(chan result)
)
var wg sync.WaitGroup
multiplex := func(c <-chan result) {
defer wg.Done()
for v := range c {
select {
case <-done:
return
case stream <- v:
}
}
}
wg.Add(numInputChans)
for _, c := range inputChans {
go multiplex(c)
}
go func() {
wg.Wait()
close(stream)
}()
return stream
}
func doSomething(i int) int {
// do something
return i*2
}
func main() {
done := make(chan interface{})
defer close(done)
var (
input = []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}
)
for v := range mux(done, execute(done, demux(done, 2, generator(done, input)), doSomething)) {
fmt.Printf(">> [%d]: in: %d, out: %d\n", v.channel, v.input, v.value)
}
fmt.Printf(">> done\n")
}
>> [0]: in: 0, out: 0
>> [0]: in: 2, out: 4
>> [1]: in: 1, out: 2
>> [1]: in: 3, out: 6
>> [0]: in: 4, out: 8
>> [1]: in: 5, out: 10
>> [0]: in: 6, out: 12
>> [1]: in: 7, out: 14
>> [1]: in: 9, out: 18
>> [0]: in: 8, out: 16
>> done
I assumed that you were already familiar with Go, goroutines, and channels. If you're not, I strongly recommend the book "Concurrency in Go." It is a great resource (though the first two chapters are too verbose and pedantry, but I believe it is worth a read.) for the Go concurrency features. Frankly, this article's implementation got much insight from the book's pipeline and fan-out/fan-in patterns.