I recently started a new job at Spartan Race and was not only given a new project, but also the choice of what technology to use. At a high-level, the project was simple: pull data from one provider’s REST endpoint; based on that, pull data from a second REST endpoint from the same provider; finally, correlate that second set of data to two other data sets from a different provider and store the results. There was a little more to it than that, but it’s a good overview and enough to set the stage.
Until recently, Spartan had been a JavaScript shop. While Node.js and js-framework-of-the-week were more than capable of handling a 200 req/sec website, the technical debt the site had accumulated gave the newly-hired technology leaders ample reason to consider alternatives. They chose to bring in Ruby/Rails for some quick wins with admin tooling and prototyping, and Elixir/Phoenix to handle the main website and high-volume data needs.
I could have chosen Ruby, but I was concerned that any failure could crash the entire process. I was also concerned about the speed which I’d be able to retrieve the data from each step in the process, how to pipeline that data, and finally work with different threads that would be involved. Well, that’s what I told myself to justify not choosing Ruby. The real reason, of course, is I wanted the opportunity to do something with Elixir. This project was the perfect fit.
I won’t bore you with you the nature of the data I was working with, so for the remainder of this article, I’ll keep it simple and Spartan related with three types of data: Race, Entry (i.e. athletes “enter” a race), and Athlete. A Race can “have many” Entries, and each Entry “belongs to” an Athlete.
If each data type was a REST endpoint, we might do something like this:
http://example.com/races
http://example.com/races/:id/entries
http://example.com/athlete/:id
You wouldn’t want to do that sequentially, because that is 1 (call to races) +
total # of Entries * 2 (1 athlete/entry)
. If you have 10 races with 20 athletes
each, the total number of calls would be 401. Instead, you’d want to have
different processes (Elixir processes) handling the different steps, and more
than one when possible.
My first pass at the problem used GenServer for all the things. There was a server to store the list of races, one to store the list of entries, and yet another to handle another data store. I had servers to handle parsing the data, compiling the data, and inserting the data. It was too much, but when all you have is a hammer…
I eventually ran into a blocker with the mess I’d created and reached out to the KC Elixir Group for help. The first thing someone asked was, “Why aren’t you using GenStage for that?”.
The simple answer is: 1) I didn’t see GenStage in Elixir’s standard library; and 2) I thought I’d heard something recently about a GenSomethingOrOther component getting deprecated.
Now I know differently.
GenStage is “a specification” for “implement[ing] systems that exchange events in a demand-driven way.” This definition makes a lot of sense if you already know what GenStage is, but for everyone else it might be easier to think of it as library that allows you to chain of series of processes together. Each process (i.e. stage) can be a “provider”, “consumer”, or a combination of the two (i.e. a provider/consumer). Consumers create demand by requesting data from providers, which in turn either return the requested data or act as consumers themselves and send demand further upstream. As data flows down from the top “producer”, each link (i.e. stage) in the chain has the opportunity to modify the incoming data prior to responding to the requesting consumer.
Let’s take GenStage out of it for a moment and look at a real-life scenario: searching for a race on https://spartan.com
It’s a very Elixir-like way of processing data, and almost like there’s some sort of invisible hand at work in it.
With the bright and shiny GenStage tool added to my toolbox I set off to rework my processes: Agents were terminated, GenServers were mutilated, and Supervisors got reorged. It was…unpleasant.
As you’ll recall, we’re looking to import athlete information. To get athlete information, we must first know what their entries are, and to get that, we have to retrieve all the races. It’s a perfect fit for GenStage. Ideally, I wanted the following stages:
Unfortunately I wasn’t able to make GenStage work with the above stages. I couldn’t figure out how to make the second stage (producer/consumer) return more data than went into it, and was stuck with a 1:1 ratio of data. One item went into a stage and only one item would come out.
I knew the Elixir team wouldn’t design something so limiting, so in my spare time I set out to understand what I was doing wrong.
To make things easy on myself, I stepped away from the project data and reworked
the examples from the GenStage documentation. Instead of using the module names
A
, B
, and C
— cleverly named as they may be — I changed them to
Publisher
, PubSub
, and Subscriber
respectively.
Let’s look first at the Publisher
module:
defmodule Publisher do
use GenStage
def start_link() do
GenStage.start_link(Publisher, 0, name: Publisher)
end
def init(counter) do
{:producer, counter}
end
def handle_demand(demand, 6) when demand > 0 do
# Stop everything when the state reaches 6. i.e. 5 iterations
System.halt(0)
{:stop, :normal}
end
def handle_demand(demand, counter) when demand > 0 do
# return the incremented counter
{:noreply, [counter+1], counter + 1}
end
end
Beginning with the start_link/0
and init/1
functions, you’ll see that we’re
setting the state to 0 and declaring this module to be a :producer
. In the
second handle_demand/2
function, we’re returning an incremented counter, and
also incrementing the state by one. The first handle_demand/2
function stops
the process when the state reaches 6.
Let’s skip the PubSub
module for a moment and look at Subscriber
.
defmodule Subscriber do
use GenStage
def start_link() do
GenStage.start_link(Subscriber, :ok)
end
def init(:ok) do
{:consumer, :na, subscribe_to: [{PubSub, max_demand: 1}]}
end
def handle_events(events, _from, state) do
Process.sleep(250)
IO.puts events
{:noreply, [], state}
end
end
The module starts up by declaring itself to be a :consumer
, with no state
(:na
), and subscribes itself to the PubSub
module. Then, the handle the
events (handle_events/3
) function sleeps for a quarter of a second and then
prints the data to stdout
. Finally, it returns the standard response for a
:consumer
of :noreply
, no events, and an irrelevant state.
Now let’s look at the real worker of this system, PubSub
:
defmodule PubSub do
use GenStage
def start_link() do
GenStage.start_link(PubSub, :ok, name: PubSub)
end
def init(:ok) do
{:producer_consumer, :na, subscribe_to: [{Publisher, max_demand: 1}]}
end
def handle_events([event], _from, state) do
events = Enum.map(1..10, &("#{event} :: #{&1}"))
{:noreply, events, state}
end
end
Again, the module starts up, declares itself to be a :producer_consumer
, sets
its state to something meaningless, and subscribes to the Publisher
module.
The interesting work happens in the handle_events/3
function.
As the documentation is currently written, it gives the impression that
handle_events/3
takes in a collection of “events”, transforms them, and then
outputs the same number of items. In reality, any number of “events” can be
returned. It’s only when the list of “events” has been depleted that demand will
be sent up to the producer module.
In the code above, the Publisher
module is producing events from 1 to 6. As
those events are read in by the PubSub
module, it is effectively multiplying
its results by 10. The output looks like this:
1 :: 1
1 :: 2
1 :: 3
1 :: 4
1 :: 5
...
5 :: 6
5 :: 7
5 :: 8
5 :: 9
5 :: 10
Even though we began with five items in the Publisher
module, we ended up
outputting 50 items showing that we are not limited to a mere 1:1 relationship
between publisher
, publisher_consumer
, and consumer
, but rather a 1:n
relationship. As more publisher_consumer
s are added to the flow, the
relationship changes to a 1:n:n, vastly increasing what’s possible.
Much like a Kanban board, GenStage allows us to view our processes and data in “stages”. In so doing, we can more easily reason through the requirements of each stage, locate and eliminate bottlenecks, distribute processing, and adjust to demand. In fact, that’s what the goals of the GenStage project are:
One of the original motivations for creating and designing Elixir was to introduce better abstractions for working with collections. Not only that, we want to provide developers interested in manipulating collections with a path to take their code from eager to lazy, to concurrent, and then distributed.
– José Valim Announcing Genstage
GenStage was the right tool for the project I was working on. It allowed me to easily group the data by sources, make only the requests I needed, and most importantly, reason about the flow of the data. Not only this, but it allowed us to break out the requests into multiple processes without overloading our data providers. Elixir and GenStage trivialized what could have been a threading nightmare.