Among many other features, Adjust Dashboard allows our clients to see how their Facebook campaigns perform. A customer can connect their Facebook account to the Dashboard. After a while number of clicks, impressions and spend appear under Facebook network trackers in the Dashboard. The apparent simplicity of this feature is deceiving. To make it work, we need to fetch data from Facebook for every client perpetually.
In today’s blog post I would like to show how using a proper back-pressure mechanism helps us send millions of HTTP requests to Facebook per day and how we implemented it using GenStage.
But first, some context
One Adjust account can have multiple Facebook accounts associated with it. A client adds Facebook accounts using OAuth authentication through Adjust MMP (Mobile Measurement Partner) Facebook app. Every Facebook account can have multiple so-called AdsAccounts. Clients use individual AdsAccounts to run their Facebook campaigns. The information about campaigns performance is available via Facebook Ads Insights Marketing API. Having Facebook accounts integrated with proper credentials and AdsAccounts synced, one can finally fetch data from Facebook. We picked Elixir as the implementation language for the project responsible for getting data from Facebook.
Original implementation
The original implementation used the easiest and the most straightforward way to run code concurrently in Elixir - Task.async
. We would iterate through all Adjust accounts which have Facebook accounts and spawn one process per Adjust account. Then in each of these processes, we would fetch data from Facebook concurrently firing HTTP requests to Facebook for all Facebook AdsAccounts available. One request — one task. Then all tasks are sent to Task.await
, the fetched data is put into a queue and a Processor
process is started per every Adjust account_id
. Each Processor
process gets the data from the queue, does some additional transformations and stores the data to the database.
As you can see, the original implementation was pretty straightforward: get all AdsAccounts, fetch the data using Tasks.async
/Task.await
, put the fetched data into the queue and process it.
However, over time, we started to observe the limitations of this architecture. We got more and more clients with Facebook accounts integrated, meaning we would spawn more and more concurrent processes to fetch Facebook data. Not only Facebook API was not happy about getting so many of these requests but our service also was struggling to digest all these processes and data fetched.
Back-pressure? GenStage!
Whenever you need a back-pressure mechanism in Elixir, the answer is obvious, it’s GenStage
. I like the wording from the GenStage announcement:
In the short-term, we expect GenStage to replace the use cases for GenEvent as well as providing a composable abstraction for consuming data from third-party systems.
This is exactly what we needed: fetching a lot of data from 3rd party service with back-pressure in place.
GenStage
brings a concept of Producer and Consumer. A Producer has events in its state and Consumer subscribes to the Producer and consumes events according to some rules. GenStage
comes with a variety of different behaviours for Consumers which dictate the way how events are going to be consumed. Once Consumer is subscribed to Producer, it demands events from Consumer and Producer handles the demand in handle_demand/2
callback. However, handle_demand/2
is not the only place from where Producer can send events to Consumer. handle_call/3
, handle_info/2
and handle_cast/2
callbacks have an additional element in the return tuple, so they can send events to Consumer too! Another important detail to note is that once Consumer asked for demand from Producer, it never asks for more demand till it gets all the events it asked previously.
The implementation
GenStage
can provide us with back-pressure, but how does it fit with the task at hand? To illustrate that let me introduce steps involved in the processing.
- fetch Facebook accounts from the database
- for each Facebook account fetch Facebook Ads accounts from the database
- for each Facebook Ads account ask Facebook for active campaigns
- for each Campaign fetch Insights from Facebook API
- store the fetched data
As you can see, there a lot of repetition of ‘for each’ statement meaning every one ‘event’ from the previous step produces more ‘events’ down the stream. Another important detail to note is that Facebook API has the quota per Facebook account and per Facebook AdsAccount, meaning after eating 100% of the quota Facebook API starts sending errors instead of actual data in the response.
For our purpose, ConsumerSupervisor behaviour seemed to be the perfect fit. It works like a pool, but every consumed event has its own separate process. ConsumerSupervisor would restart crashed processes and would demand more events once min_demand
processes terminate with :normal
or :shutdown
status. We could adapt it to our needs, this is how the very beginning of our flow looks like.
+------------+ |AdsAccounts | +--->+Producer | | | | | +------------+ | +----------+ +-----------+ +------------+ | Accounts | |Accounts | |AdsAccounts | | Producer +<-----+Consumer +--->+Producer | | | |Supervisor | | | +----------+ +-----------+ +------------+ | | +------------+ | |AdsAccounts | +--->+Producer | | | +------------+
AccountsProducer
is a part of the application’s Supervisor tree, so it’s started when the app is started. It fetches active Facebook accounts from the database and puts them into its state. AccountsConsumerSupervisor
is also a part of the application’s Supervisor tree and it is subscribed to the AccountsProducer. Once AccountsProducer
gets the Facebook accounts in its state, AccountsConsumerSupervisor
starts to consume them and spawn a process per each Facebook account consumed. From the code perspective, it looks like the following.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 |
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
|
The initial number of events to demand and the number of events to trigger for more demand are specified by max_demand
and min_demand
options respectively. This allows us to control how many Facebook accounts we would like to process at once. Each AdsAccountProducer
gets an event (Facebook account_id) from AccountsProducer
. Once started, AdsAccountsProducer
fetches from the database all Facebook Ads accounts which belong to given Facebook account and then puts them into its state. AdsAccountsProducer
uses Registry to name processes. Using Registry
allows us to comply with Name registration restrictions. Also, we poll Registry
to report a number of alive workers to the metrics collection system.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 |
|
Great, now we can ‘produce’ and ‘consume’ Facebook accounts, but what’s next? Each AdsAccountsProducer
holds some AdsAccounts in its state, but there are no consumers which would consume them to continue the flow. So why not to spawn consumers dynamically per AdsAccountProducer
and use the same ConsumerSupervisor
logic further?
+------------+ |Campaigns | +---> |Producer | | | | | +------------+ | +------------+ +-----------+ +------------+ |AdsAccounts | |AdsAccounts| |Campaigns | +---> |Producer | <----+Consumer +--> |Producer | | | | |Supervisor | | | | +------------+ +-----------+ +------------+ | | +-----------+ +-----------+ +------------+ | +------------+ | Accounts | |Accounts | |AdsAccounts | | |Campaigns | | Producer | <-----+Consumer +--> |Producer | +---> |Producer | | | |Supervisor | | | | | +-----------+ +-----------+ +------------+ +------------+ | | +------------+ | |AdsAccounts | +---> |Producer | | | +------------+
Starting consumer dynamically would require adding AdsAccountsConsumerSupervisor.start_link(account_id, self())
to handle_info/2
in AdsAccountsProducer
, so it would start a consumer for itself after it puts AdsAccounts into its state. The self()
among the arguments is required so AdsAccountsConsumerSupervisor
knows a process it needs to subscribe to.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
|
Every AdsAccountProducer
starts its own consumer, which would consume AdsAccounts and spawn CampaignProducer
per each Facebook AdsAccount. CampaignsProducer
gets AdsAccount and a date to fetch, then it asks Facebook API for active campaigns which are running under given AdsAccount for given date. And then finally it puts campaigns into its state and, you guessed it, starts a consumer for itself.
+------------+ |Insights | +---> |Producer | | | | | +------------+ | +------------+ +-----------+ +------------+ |Campaigns | |Campaigns | |Insights | +---> |Producer <-----+Consumer +--> |Producer | | | | |Supervisor | | | | +------------+ +-----------+ +------------+ | | +------------+ +-----------+ +------------+ | +------------+ |AdsAccounts | |AdsAccounts| |Campaigns | | |Insights | +---> |Producer <-----+Consumer +--> |Producer | +---> |Producer | | | | |Supervisor | | | | | | +------------+ +-----------+ +------------+ +------------+ | | +-----------+ +-----------+ +------------+ | +------------+ | Accounts | |Accounts | |AdsAccounts | | |Campaigns | | Producer | <-----+Consumer +---> |Producer | +---> |Producer | | | |Supervisor | | | | | +-----------+ +-----------+ +------------+ +------------+ | | +------------+ | |AdsAccounts | +---> |Producer | | | +------------+
Every InsightProducer
gets a Facebook campaign_id
, fetches Insights from Facebook Marketing API and puts the fetched data into its state.
Unfortunately, InsightsProducer
cannot store data yet. A Facebook campaign’s insights represents data per day, whereas at Adjust we have to store this data per hour because of timezones support. Therefore a Consumer for InsightsProducer
needs to issue an additional HTTP request to Facebook API for every Ad to get hourly distribution. The fact that we have quite a lot of Ads to ask hourly distribution for imposes some limitations to the way how we can consume Insights from every InsightsProducer
. Consuming the events from InsightsProducer
the same way using ConsumerSupervisor behaviour would generate a lot of concurrent requests to Facebook even if max_demand
would be 2, so quota would be consumed quite fast. Therefore the Consumer for InsightsProducer
should consume events slowly and check quota after every request. Fortunately, GenStage comes with manual
mode, which allows consuming events explicitly. Once a Consumer is set into manual
mode, there is no max_demand
and min_demand
anymore, one should ask for events explicitly instead.
+----------+ +----------+ +----------+ |Insights | |CostData | |CostData | +---> |Producer <----+Producer <----+Consumer | | | | |Consumer | | | | +----------+ +----------+ +----------+ | +------------+ +-----------+ +----------+ +----------+ +----------+ |Campaigns | |Campaigns | |Insights | |CostData | |CostData | +---> |Producer <-----+Consumer +--> |Producer <----+Producer <----+Consumer | | | | |Supervisor | | | |Consumer | | | | +------------+ +-----------+ +----------+ +----------+ +----------+ | | +------------+ +-----------+ +------------+ | +----------+ +----------+ +----------+ |AdsAccounts | |AdsAccounts| |Campaigns | | |Insights | |CostData | |CostData | +---> |Producer <-----+Consumer +---> |Producer | +---> |Producer <----+Producer <----+Consumer | | | | |Supervisor | | | | | |Consumer | | | | +------------+ +-----------+ +------------+ +----------+ +----------+ +----------+ | | +-----------+ +-----------+ +------------+ | +------------+ | Accounts | |Accounts | |AdsAccounts | | |Campaigns | | Producer | <-----+Consumer +---> |Producer | +---> |Producer | | | |Supervisor | | | | | +-----------+ +-----------+ +------------+ +------------+ | | +------------+ | |AdsAccounts | +---> |Producer | | | +------------+
CostDataProducerConsumer
is set to manual mode, it’s started by InsightsProducer
, demands one event (one ad), sends a request to Facebook API, gets the data and passes it to the CostDataConsumer
which finally stores it to the database. After every request to the Facebook API, CostDataProducerConsumer
checks quota values in the response headers: if the quota is nearly depleted, it demands a new event from InsightsProducer
with some delay using Process.send_after/3
. Otherwise, if the quota values allow, it does that immediately. Also, a Consumer of InsightsProducer is actually a ProducerConsumer, because it both consumes and produces events. Here is how one can set a Consumer or ProducerConsumer into the manual
mode.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 |
|
That is finally the end of the flow. So far it looks like the following:
AccountsProducer
is started by the main Supervisor, gets accounts from db, puts into its stateAccountsConsumerSupervisor
is started by the main Supervisor, it subscribes toAccountsProducer
, consumes events (accounts) and spawn oneAdsAccountProducer
per each account- Each
AdsAccountsProducer
fetches Facebook account’s AdsAccounts from the database, puts them into state and starts dynamically a ConsumerSupervisor for itself AdsAccountsConsumerSupervisor
consumes AdsAccounts, spawns oneCampaignsProducer
per each AdsAccount- Each
CampaignsProducer
gets AdsAccount, fetches active campaigns from Facebook API, puts them into its state and starts aCampaignsConsumerSupervisor
for itself CampaignsConsumerSupervisor
consumes campaigns, spawns oneInsightsProducer
per each campaign- Each
InsightsProducer
gets campaign’s Insights from Facebook API, puts the data into its state and starts a consumer for itself - A consumer for
InsightsProducer
isCostDataProducerConsumer
, it’s set intomanual
mode and consumes events one by one, for every consumed event (an ad) it sends additional HTTP request to Facebook API, gets the data and passes it further toCostDataConsumer
CostDataConsumer
gets all the data, does some transformations (timezone conversion, currency conversion, etc) and puts data into the database
Phew. That’s a lot happening here, but although it might look complicated, in fact, the architecture is quite simple. The same ConsumerSupervisor behaviour was applied several times to run multiple Facebook Accounts, AdsAccounts and Campaigns processes concurrently and without blocking each other.
Now, the question is how and when a producer process exits with :normal
or :shutdown
status, so ConsumerSupervisors can demand more events and spawn more processes. So let’s follow the termination path, i.e. how these GenStage processes get terminated. Let’s start with the last part: InsightsProducer
- CostDataProducerConsumer
- CostDataConsumer
. CostDataProducerConsumer
demands events from InsightsProducer
one by one and passes the events down the flow to the CostDataConsumer.
Every time an event is consumed, CostDataProducerConsumer
asks its InsightsProducer
how many events are left in its state. When the answer is zero, CostDataProducerConsumer
sends an event to CostDataConsumer
indicating that there was the last event. Let’s see how it would be implemented in handle_events/3
callback from the listing above.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
|
After that CostDataConsumer
has 10 seconds to finish processing and storing the last batch of the events. After 10 seconds it terminates itself with :normal
status.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
|
Since CostDataConsumer
, CostDataProducerConsumer
and InsightsProducer
are linked using start_link/3
, termination of CostDataConsumer
would terminate InsightsProducer
and CostDataProducerConsumer
with the same status. Once InsighsProducer
goes down with :normal
state, CampaignsConsumerSupervisor
can demand more campaigns from CampaignsProducer
and spawn more InsightsProducers
for the new campaigns.
Now let’s see how CampaignsProducer
and AdsAccountsProducer
terminate itself. The logic is the same for both of these producers, so let me show in detail how CampaignsProducer
exits with :normal
state. CampaignsProducer
checks its state every 5 seconds and when there are no more campaigns in its state to process and there are no InsightsProducers
active, it exits with :normal
state, which allows AdsAccountsConsumerSupervisor
to spawn more CampaignsProducer
for the newly consumed AdsAccounts.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 |
|
AdsAccountsProducer
has the same logic, the only difference is ConsumerSupervisor name in consumers_alive?/2
function.
The only GenStage processes which never goes down (unless there is an exception) are AccountsProducer
and AccountsConsumerSupervisor
. Once the number of accounts in AccountsProducer
is closing to zero, it repopulates its state with more accounts from the database, so it never stops producing events.
Summary
GenStage
allows a developer to build sophisticated data flows with back-pressure in place. It provides necessary abstractions for producing and consuming events. In combination with Registry
, we could build a robust application which can fetch and process Facebook cost data for thousands of different AdsAccount without blocking each other. Every AdsAccount, Campaigns or Ad is processed separately from each other and if any of processes crashes, GenStage’s ConsumerSupervisor would restart it. The application can dynamically speed up or slow down the flow by itself based on Facebook quota values.
This blog post got long enough already and I even haven’t started to talk about one of the most important part of the application — HTTP client. We send over 6 millions of heavy, long-lasting HTTP requests to Facebook per day, so having a reliable and fast HTTP client is vital. This is going to be a topic for my next blog post. Stay tuned!