I am learning the Go programming language and building a small CLI tool for work
with it. The tools main purpose is to generate a large amount of synthetic FHIR
resources and insert them into a GCP FHIR
store using its REST API.
My secondary goal for the tools is to make it as fast as possible. This is why I decided to make as
many of the REST calls in parallel as possible taking advantage of Go’s builtin concurrency
primitives. In the process I learned about goroutines, channels, sync.WaitGroup
and a neat worker
pool pattern that is broadly applicable.
Concurrency with goroutines
My first attempt at this was to make every single HTTP requests to the server in parallel. I wrote a
function that takes a slice of closures (one closure per HTTP request) and invokes them concurrently
using a goroutine for each. It worked!
func ConcurrentDo(actions []func()) {
done := make(chan bool)
for _, a := range actions {
f := func(action func(), done chan<- bool) {
action()
done <- true
}
go f(a, done)
}
// Wait for all actions to signal completion before returning.
for i := 0; i < len(actions); i++ {
<-done
}
}
Few things to note here:
-
It is important to pass a
as a parameter to the goroutine function to make sure it is copied.
This is because Go iterations use the same instance of loop variable a
which means that all
closures created in the loop share the same instance of a
(last one). To avoid this we should
make an explicit copy of the variable by passing it to the function. Language designers now
consider this behavior a mistake which cannot
be fixed in a backward compatible way.
-
A channel is used to signal completion and we wait to receive completion from all go routines
before exiting the function. In other words, from caller’s perspective ConcurrentDo
behaves
synchronously. This makes it simple to reason about while under the hood it operates
concurrently.
-
It is possible to pass around a channel as an output channel, chan<-
or an input channel
chan->
.
Simplify with a WaitGroup
It was quickly pointed out to me that there is
sync.WaitGroup
in Go standard library which is designed
exactly for waiting on a group of goroutines. So I swapped the channel I was using for signaling
completion with a WaitGroup. This resulted in a simpler and slightly more efficient code.
func ConcurrentDo(actions []func()) {
var wg sync.WaitGroup
wg.Add(len(actions))
for _, a := range actions {
go func(action func()) {
defer wg.Done()
action()
}(a)
}
wg.Wait()
}
An interesting this to note is that the code is using defer
to call wg.Done()
. This means that
even if the call to action
panics we are still signaling the WaitGroup
correctly and it does not
hang. In practice this a moot point since my CLI program does not recover from panic and simply
terminates regardless of the hang. However I figured using defer
is a good habit to get into.
Worker Pool Pattern in Go
This worked well until I started to ratchet up number of actions. While Go was more than happy to
spawn thousands of goroutines but the goroutines would start failing. It turned out that my
program’s process was running out of its quota for opening sockets to make HTTP requests. macOS
default limit for open files per process (which includes sockets) is a meager 256
(determined via
ulimit -n
). I could have bumped this limit up on my machine but that was just a local fix. So I
went with the more sensible choice of limit number of concurrent HTTP requests which works well for
all users. This is were Worker Pool comes into play.
Worker Pool is a common concurrency pattern where a number of actions are distributed on a set of
workers. Each worker picks and completes a task from a queue and then picks another one until the
queue is empty. The parallelism factor is now controlled by number of workers. Below is how the
ConcurrentDo
function changes to implement this pattern:
func ConcurrentDo(actions []func(), maxWorkers int) {
workers := maxWorkers
if len(actions) < maxWorkers {
workers = len(actions)
}
var wg sync.WaitGroup
wg.Add(workers)
// Use a buffered channel with the same size as worker
// to ensure at most that many actions are buffered.
work := make(chan func(), workers)
for i := 0; i < workers; i++ {
go func() {
defer wg.Done()
// Continously grab an action from channel and
// complete it until channel is closed.
for action := range work {
action()
}
}()
}
for _, a := range actions {
work <- a
}
close(work)
wg.Wait()
}
Let’s break this down:
- The actions are no longer iterated over but instead they are sent over a buffered channel which
is received from by the workers in the pool. The Go channel is itself the work queue!
- We create one goroutine per worker (instead of previously one per action). Each goroutine
iteratively picks up actions from the work channel and completes it.
- We still use a
sync.WaitGroup
to wait on all actions to be complete. We could have instead
waited for all workers to complete but I felt waiting on actions is slightly more readable without
much of a performance cost.
It was a nice surprise to realize that I can use range
with a channel and it works as one would
naturally expect. The other improvement I made was to call close
on work channel. Closure of the
channel ends the loop in workers which are waiting for more work. Again, I really enjoy how
this all fits together in Go.
Finally it is possible for the work
channel to be a non-buffered channel which means that it
buffers all actions at once. However I felt it is more prudent to use a buffered channel with a
buffer that matches number of workers. Once I get a chance I will comparing the two options more
deeply in particular in terms of performance.
Final Words
Overall my experience with Go concurrency has been very pleasant so far. Various primitives such as
atomics based sync.WaitGroup
and message passing based channels mesh well together and provide a
powerful set of building blocks.