How we reduced Standard Audience calculation time by 80%
mParticle’s Audiences feature allows customers to define user segments and forward them directly to downstream tools for activation. Thanks to our engineering team’s recent project to optimize one of one of our audience products, mParticle customers will be able to engage high-value customers with even greater efficiency.
Audiences is one of the most flexible, powerful, and heavily leveraged tools within the mParticle platform. mParticle users can define audiences based on any user-related data captured from any source, be it a standard input like a web, iOS or Android app, or a partner feed. Once audiences are created, you can easily forward them to downstream activation partners to power a wide variety of use cases, like driving user engagement and app downloads or visualizing customer journeys, to name just a few.
As Audiences is central to the value that customers derive from mParticle, this feature set is a high priority when it comes to performance optimization work. This is why recently underwent a project to overhaul Standard Audiences––a product that extends the functionality of regular Audiences. This article dives into how we conducted research to potential areas of optimization, executed on these opportunities, and evaluated the results.
What is Standard Audiences?
The Standard Audiences feature on the mParticle platform allows you to define and build audiences based on historical user data, which is stored in Amazon S3, mParticle’s long-term data store. Since Standard Audiences are based on events that customers perform over a long period of time, they are very useful for targeting subsets of users who have demonstrated high lifetime value.
How are Standard Audiences calculated?
Standard Audience calculations use the same audience engine as real-time audiences. The main difference lies in how the user profile and events data are fed into the audience engine. Real-time audience calculation is a stream processing job that reads real-time user events from a stream. On the other hand, the Standard Audience calculation is a batch job that reads from S3.
A typical Standard Audience batch job needs to read many terabytes (up to a few petabytes) of data from S3. As such, it is obvious that we need to cut the whole job into smaller pieces and process them concurrently. A Standard Audience batch job can be represented as a DAG (Directed Acyclic Graph) that consists of many independent tasks, each of which is responsible for calculating audience membership for a small subset of users. In the end, the Standard Audience calculation results are stored in a unique location for each DAG in S3. The task dependencies, task status, and task workload specifications are managed by a home-grown batch job manager, which uses a MySQL database to store the DAG and task data and submits the tasks to AWS Batch for execution.
Identify opportunities for optimization
As the usage of offline audiences and the amount of user data on S3 grew organically, we noticed that some big Standard Audience DAGs consisting of hundreds of thousands of tasks could take six days to finish, which interrupted our customers’ workflows as they had to wait for almost a week for results. We set out to investigate the bottlenecks in the system and learn what we could do to improve performance. We added more metrics and logging to all steps in audience calculation task, and this is what we found:
Finding 1: The logic behind API calls could be optimized
The audience calculation task makes API calls to a user profile service and batch job manager. When there are many concurrent calls, they can become sluggish or return failures. When a task fails, it will be retried up to four times. Since a task-level retry entails starting a task from the beginning, this is an expensive operation.
Solutions:
- Reexamine the API call logic and make only the minimum number of calls necessary.
- Size the external services/databases properly to handle the workload incurred by the largest Standard Audience DAG.
- Add retry logic with exponential backoff with jitter to all API calls.
Result:
- Reduced running time and cost by 20%.
Finding 2: The memory requirement for tasks could be reduced
The memory requirement for each individual task (configured in AWS Batch job definition) is 30GB, which was determined by the worst case scenario. Most of the tasks do not need that much memory. We could run many more tasks concurrently on the same ECS cluster if we can reduce the memory requirement per task.
Solutions:
- Reduce the default memory requirement to 7.5GB, and if a task fails because it ran out of memory, retry with 2x of the previously run’s memory requirement (capping at 30GB).
Result:
- A 4x increase in the amount of tasks we could run on the same ECS cluster.
- A 2x increase in throughput, reducing the total DAG run time and cost by 50%. It is 2x, not 4x because when we run 4x containers using 7.5GB memory each container on the same EC2 instance, it takes them longer to finish and a small percentage of them needs to be retried with more memory. The end result is approximately 2x increase in total throughput.
Finding 3: Make the data-finding logic more efficient
The performance is IO bound. Most of the time is spent on retrieving data for the subset of users that an offline audience calculation task is responsible for. Data retrieval consists of two main steps: finding the data and downloading it. Finding data on S3 means ListBucket calls from many concurrent tasks, which is relatively fast but the aggregated time is significant.
Solutions:
- Refactor the data-finding logic such that a small number of S3 ListBucket calls can find the data for all of the audience calculation tasks. Put this logic into a new task that all of the audience calculation tasks depend on.
Result:
- A 30% average reduction in total DAG run time.
Finding 4: Partition sizes could be tuned
We use EC2 spot instances in the ECS clusters managed by AWS Batch. This means that they could be terminated at any time. On one hand, we want each individual task to finish quickly so that when they get killed due to spot instance termination, we don’t waste much CPU time. On the other hand, we want each individual task to be fairly large because downloading a larger amount of data from S3 in one request incurs less overhead. This means that the partition size for individual tasks should be tuned.
Solutions:
- Finding the optimal individual task partition size through experiments.
Result:
- A 30% reduction in total DAG run time.
Now let’s crunch the numbers: (1 - 20%)(1 - 50%)(1 - 30%)(1 - 30%) = 19.6%.
This is how we reduced Standard Audience calculation time and cost by 80%. The result is very noticeable. Now most Standard Audience DAGs finish within 10 hours, with the maximum being approximately one day on DAGs from our biggest customers.
We continuously invest in the infrastructure that drives our customer's outcomes, which is highly valuable. Also, these problems are really fun to solve, by very talented engineers. This project was at the intersection of Value, Fun, and Talent, which is very exciting!
Senior Vice President of Engineering, mParticle
Data-driven optimizations
In the solutions outlined above, we took a data-driven approach to finding the best parameters, such as partition size or initial memory requirement. Specifically, we reran several DAGs with different parameters, extracted the performance metrics, and then compared the results using Jupyter notebooks. We don’t guess. We let the data speak for themselves.
Metrics and monitoring
In order to track the performance and cost of the Standard Audience DAGs, we added metrics to track events scanned per DAG, bytes read per DAG, total latency per task, and time spent by audience calculation engine per task, leveraging the automated cost reporting our data services team built in house. We implemented two versions of these metrics, one for internal monitoring and the other for customer facing metrics. For example, for events scanned per DAG, we track the actual total events scanned per DAG for internal monitoring (including events scanned by failed and then retried tasks), and track the total events scanned by successful task runs per DAG for customer billing purposes. If some tasks within a DAG got retried, this is something we should know about, but we should not charge customers for the cost of retries.
By analyzing the performance metrics, we were able to establish SLOs for the Standard Audience DAGs. One of the key SLOs is total DAG latency of 24 hours. We set up DataDog monitoring for the SLOs so that we will get alerted if they are exceeded.
Future work
After these optimizations, Standard Audience is still a highly IO-bound job. We believe that we can improve the performance further in the following ways.
- Build better indices for the user event data on S3, reducing the amount of data transfers.
- Add persistent checkpoints to the Standard Audience calculation task such that it can resume where it left off should it be killed due to spot instance termination.
- Devise better partitioning schemes to eliminate hot partitions. This would divide the work into more evenly sized partitions.