ETL pipelines in Go
I’ve been porting a service written for Node.js to Go. It’s proving a useful exercise for exploring how to implement an ETL pipeline in Go, in particular how to leverage some of Go’s concurrency features.
When complete, the service will extract data from a Postgres database, perform transformations on the data, write the output to an Excel spreadsheet and upload it to another service.
The domain is the logistics industry, so the data is about shipments. A trivial transformation could be to sum the total weight of the parcels in the shipment. The dataset could be very large (millions of shipments) so streaming is clearly important. As each shipment is read out of Postgres we want to transform, convert to Excel format and (multipart) upload as a pipeline. We don’t want that amount of data in memory.
This can, of course, be achieved in Go without concurrency. So this post will start with a non-concurrent pipeline and build up to various versions of a concurrent pipeline. It won’t delve into converting to Excel format or tackle multipart uploads. Instead, just focusing on extracting from Postgres, performing an arbitrary transformation and writing to a file on disk is enough to be instructive.
Let’s get stuck in!
Background
Before we look at any pipeline code, let’s briefly set some background.
Postgres can be run in Docker with some shipments seeded using the docker-entrypoint-initdb.d
volume as follows:
CREATE TABLE Shipments(
shipmentId bigint NOT NULL UNIQUE,
consignmentNumber text NOT NULL,
CONSTRAINT PK_Shipments PRIMARY KEY (shipmentId)
);
INSERT INTO Shipments(shipmentId, consignmentNumber)
SELECT n, 'consNum' || n FROM generate_series(1, 1000) n;
Each shipment retrieved from Postgres will be decoded to a simple struct:
package model
type Shipment struct {
ShipmentId int
ConsignmentNumber string
TotalWeight int
}
Note totalWeight
isn’t a column in the Shipments
table. It will be a field derived during the pipeline transformation stage.
Postgres access
There are two options for accessing Postgres in Go:
- the
database/sql
package in the standard library plus a driver - a fully-fledged third-party library
pgx
can be used for both options. Even though we won’t be using Postgres specific features, the latter option is sensible for the sake of simplicity and because the service won’t be targeting any other SQL databases.
I decided to write a package for all Postgres access, partly due to single responsibility concerns but primarily to keep non-pipeline code to a minimum in main.
There are two structs involved as follows:
package pg
type Store struct {
conn *pgx.Conn
}
type ShipmentsCursor struct {
rows *pgx.Rows
Err error
}
Although Store
and ShipmentsCursor
are both exported, conn
and rows
can and should be kept internal to the pg
package.
There is a function returning a new store. One of the benefits of using a package is that we can centralise any error wrapping.
func NewStore(ctx context.Context) (_ Store, err error) {
defer func() {
if err != nil {
err = fmt.Errorf("creating new postgres store: %w", err)
}
}()
pgConn, err := pgx.Connect(ctx, os.Getenv("POSTGRES_URI"))
if err != nil {
return Store{}, err
}
err = pgConn.Ping(ctx)
if err != nil {
return Store{}, err
}
return Store{conn: pgConn}, nil
}
Next, we need a method on Store
for getting a ShipmentsCursor
. It looks too trivial here, but another benefit of using a package would become clearer as the complexity of the SQL query increases.
func (store Store) GetShipmentsCursor(ctx context.Context) (ShipmentsCursor, error) {
rows, err := store.conn.Query(ctx, "select shipmentId, consignmentNumber from Shipments")
if err != nil {
return ShipmentsCursor{}, fmt.Errorf("getting shipments cursor: %w", err)
}
return ShipmentsCursor{rows: &rows}, nil
}
To complete the first pass of implementing our pg
package we need a method to get the next shipment from a cursor. This is very similar to the standard database/sql
approach (just via pgx
). There are other pgx
specific approaches that could be taken.
func (cursor *ShipmentsCursor) GetNextShipment(shipment *model.Shipment) bool {
rows := *cursor.rows
if !rows.Next() {
cursor.Err = rows.Err()
return false
}
if err := rows.Scan(&shipment.ShipmentId, &shipment.ConsignmentNumber); err != nil {
cursor.Err = fmt.Errorf("getting next shipment: %w", err)
return false
}
return true
}
And with that we can move on to pipeline code!
For the sake of brevity, all the following sections will omit the code for creating/getting and closing a Store
, a ShipmentsCursor
and an os.File
. They will also omit error handling. Each section will simply discuss a function with the signature func(pg.ShipmentsCursor, io.Writer)
.
Non-concurrent pipeline
There’s not a great deal to talk about here…
Memory usage is fine. As pgCursor.GetNextShipment
takes a *model.Shipment
the variable nextShipment
is repeatably used and therefore takes up a single location in memory.
However, because we’re processing a single shipment at a time, speed isn’t going to be optimised. For some scenarios that may well be perfectably acceptable.
func nonConcurrent(pgCursor pg.ShipmentsCursor, w io.Writer) {
jsonEnc := json.NewEncoder(w)
var nextShipment model.Shipment
for pgCursor.GetNextShipment(&nextShipment) {
nextShipment.TotalWeight = rand.Int()
jsonEnc.Encode(nextShipment)
}
}
Concurrent pipeline #1 - non-waiting
The easiest way to get a bit of concurrency involved is to fire off some goroutines and let them at it.
We use a channel to pass shipments to them. Whichever goroutine finishes first will take the next shipment from the channel, therefore the ordering of shipments returned by Postgres is not preserved.
A bigger problem though, is that as soon as all shipments are read from Postgres the channel is closed and the pipeline completes. The effect is not all transformed shipments are guaranteed to be written to w
. Indeed, during local testing, between 2 and 4 shipments are consistently lost.
func concurrentNonWaiting(pgCursor pg.ShipmentsCursor, w io.Writer) {
shipments := make(chan model.Shipment)
jsonEnc := json.NewEncoder(w)
for range 5 {
go func() {
for shipment := range shipments {
shipment.TotalWeight = rand.Int()
jsonEnc.Encode(shipment)
}
}()
}
var nextShipment model.Shipment
for pgCursor.GetNextShipment(&nextShipment) {
shipments <- nextShipment
}
close(shipments)
}
Concurrent pipeline #2 - waiting
Of the two problems, the easiest to fix is the premature pipeline completion. We introduce a wait group and another goroutine to handle tidying up.
wg.Wait
pauses the “main” goroutine until the wait counter is zero. The wait counter is decremented by wg.Done
when each “pipeline” goroutine completes. Because we’re using an unbuffered channel we know that the “postgres” goroutine can’t complete until all the shipments it’s written to the channel are read.
These features ensure an orderly goroutine clean up, and allow all transformed shipments to be written to w
.
func concurrentWaiting(pgCursor pg.ShipmentsCursor, w io.Writer) {
var wg sync.WaitGroup
wg.Add(5)
shipments := make(chan model.Shipment)
jsonEnc := json.NewEncoder(w)
for range 5 {
go func() {
// "pipeline" goroutine
defer wg.Done()
for shipment := range shipments {
shipment.TotalWeight = rand.Int()
jsonEnc.Encode(shipment)
}
}()
}
go func() {
// "postgres" goroutine
defer close(shipments)
var nextShipment model.Shipment
for pgCursor.GetNextShipment(&nextShipment) {
shipments <- nextShipment
}
}()
// "main" goroutine
wg.Wait()
}
Concurrent pipeline #3 - batches
As a reminder, the second problem to solve is maintaining the ordering of shipments returned by Postgres. As another reminder, we need to stream data, we can’t hold all the transformed shipments in memory to then re-order them again.
The solution is to introduce batching to the pipeline. For a batch of shipments read from Postgres, they must be transformed and re-ordered before being written to w
. That must all happen before the next batch is started.
There is a jump in complexity to achieve this. I found a blog article on go.dev seminal in understanding how to go about it. That article introduces two ideas:
- Connecting each pipeline stage by channels (whereas so far we’ve only used one channel).
- Having communication/notification channels.
Let’s consider the three pipeline stages, which can be pulled out to their own functions and discussed seperately.
GetNextShipmentBatch
In the pg
package we implement a new method to get the next shipment batch from a cursor. The postgres interaction is essentially the same as GetNextShipment
, apart from we only retrieve size
shipments. The concurrency features are familiar, making use of a WaitGroup
, a goroutine for retrieving the shipments, and a goroutine for tidying up.
The main new thing to notice is the done
channel. This is a notification channel. When the pipeline is processing the final batch and Next
returns false, we can a send a notification. struct{}{}
is essentially an “any”, as the contents of the notification doesn’t matter.
func (cursor *ShipmentsCursor) GetNextShipmentBatch(size int, done chan<- any) <-chan model.Shipment {
out := make(chan model.Shipment)
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
for range size {
hasNext := (*cursor.rows).Next()
if !hasNext {
cursor.Err = (*cursor.rows).Err()
done <- struct{}{}
return
}
var shipment model.Shipment
if err := (*cursor.rows).Scan(&shipment.ShipmentId, &shipment.ConsignmentNumber); err != nil {
cursor.Err = fmt.Errorf("getting next shipment batch: %w", err)
done <- struct{}{}
return
}
out <- shipment
}
}()
go func() {
wg.Wait()
close(out)
}()
return out
}
transformBatch
transformBatch
has very similar patterns to GetNextShipmentBatch
. However, there is no notification channel passed, as transformBatch
is a stage in the middle of the pipeline.
Although shipment.TotalWeight = rand.Int()
is hardly a complicated transformation, having it pulled out to a pipeline stage illustrates how more involved transformations would be handled concurrently.
func transformBatch(size int, in <-chan model.Shipment) <-chan model.Shipment {
out := make(chan model.Shipment)
var wg sync.WaitGroup
wg.Add(size)
for range size {
go func() {
defer wg.Done()
for shipment := range in {
shipment.TotalWeight = rand.Int()
out <- shipment
}
}()
}
go func() {
wg.Wait()
close(out)
}()
return out
}
processBatch
Again, processBatch
uses similar patterns to what we’ve already seen. There is another done
notification channel, but this time for notifying that the batch is fully processed.
func processBatch(size int, jsonEnc *json.Encoder, in <-chan model.Shipment, done chan<- any) {
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
shipments := make([]model.Shipment, 0, size)
for shipment := range in {
shipments = append(shipments, shipment)
}
sort.Slice(shipments, func(i, j int) bool {
return shipments[j].ShipmentId > shipments[i].ShipmentId
})
for _, shipment := range shipments {
jsonEnc.Encode(shipment)
}
}()
go func() {
wg.Wait()
done <- struct{}{}
}()
}
main
Finally, we get to the code running in the “main” goroutine! Again, we use a WaitGroup
to make sure that the pipeline isn’t exited prematurely. What is different is how we handle the goroutine exit conditions. We use a “for-select” to indefinitely loop and randomly execute one of the cases.
The done
case/channel will only receive a notification when all shipments have been read from Postgres. We don’t exit immediately at this point, as the last batch will still be executing. Instead we set a local flag, so that the next time the batchDone
case/channel receives a notification we know we can safely exit.
func concurrentBatches(pgCursor pg.ShipmentsCursor, w io.Writer) {
batchSize := 5
jsonEnc := json.NewEncoder(w)
batchDone := make(chan any)
done := make(chan any)
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer func() {
close(batchDone)
close(done)
wg.Done()
}()
isDone := false
for {
select {
case <-batchDone:
if isDone {
return
}
shipmentBatch := pgCursor.GetNextShipmentBatch(batchSize, done)
transformedBatch := transformBatch(batchSize, shipmentBatch)
processBatch(batchSize, jsonEnc, transformedBatch, batchDone)
case <-done:
isDone = true
}
}
}()
batchDone <- struct{}{} // start
wg.Wait()
}
Conclusion
It’s always easiest to learn something new when “real” requirements are involved. That’s why porting a solved problem from another language is particularly instructive. You already understand the problem domain and get to focus on applying the idioms of another language to it. There’s still plenty to solve with this problem domain in Go. I would like to add profiling in order to get a sense of the optimum batch size, i.e. memory consumption vs speed. There is also the handling of error scenarios to figure out, e.g. what happens if any of the pipeline stages error? How do you idiomatically handle errors from non-“main” goroutines? But, as far as this post goes, I hope you’ve found some of the above useful!