As engineers, our goal for Bronto’s segmentation feature is to provide our clients with a powerful, flexible, user-friendly tool that allows them to drill down into their contact data to find new marketing opportunities. Over the years, as Bronto has grown, we have progressed through several iterations of our segmentation engine in a never-ending quest for better performance.
In case you’re unfamiliar with our segmentation tool, a segment is comprised of “rules” and “criteria.” A segment can contain 1 to M rules and a rule can contain 1 to N criteria. By default, the boolean logic for criteria that are siblings within the same rule ORs their results together while the calculated results of rules are ANDed together. We also allow the ability to invert the boolean logic between rules and criteria as you can see here:
In this manner we are able to provide the most flexibility to our users, as they can create any combination of ANDed and ORed boolean logic based upon how they nest their criteria into the same or different rules.
In the early days of Bronto Software, the entire application was based on the LAMP stack. Thus the first segmentation engine was written in PHP and it queried MySQL for the relevant data. Let’s take an example segment and see how it was processed by the various incarnations of the segmentation engine as it evolved over the years. This particular example is a segment that is looking for active contacts who have been sent a specific set of deliveries but have not clicked on any links in those deliveries. The contacts must also be members of a specific list and have a specific field associated with the contact that has a value less than 1,000.
Segmentation Engine 1.0
The first iteration of segmentation was naive; our focus was on functionality rather than performance. Thus, when a user saved a segment in the UI we translated their selections into a giant SQL query and ran it all at once. As you might imagine, this could get hairy rather easily. If you’re familiar with SQL, you’d probably cringe if you saw a query that looked like this:
SELECT DISTINCT(contact.id) AS contact_id FROM contact LEFT JOIN tracking ON tracking.contact_id = contact.id JOIN field ON field.contact_id = contact.id JOIN list ON list.contact_id = contact.id WHERE field.id = '99278266' AND field.value < 1000 AND list.id = '882221' AND tracking.delivery_id IN (123432,982545,820205586,44646255,325816,852469) AND subscriber.status = 'active' AND contact.deleted = '0' AND contact.client_id = '2343675427' AND tracking.id IS NULL;
The query is performing a join across four different tables; each table can potentially be huge (at the time of this post some of them contain billions of rows.) But given the relatively small amount of data that we had stored in MySQL at the time version 1.0 of segmentation was built, this was adequate then.
Segmentation Engine 1.5
As our data sizes began to grow we realized we needed to make some optimizations. We still composed a monolithic SQL query, but we’d run it in batches against contacts, several thousand contacts at a time, so that we’d be less likely to run into MySQL timeouts or locking issues:
SELECT DISTINCT(contact.id) AS contact_id FROM contact LEFT JOIN tracking ON tracking.contact_id = contact.id JOIN field ON field.contact_id = contact.id JOIN list ON list.contact_id = contact.id WHERE field.id = '99278266' AND field.value < '1000' AND list.id = '882221' AND tracking.delivery_id IN (123432,982545,820205586,44646255,325816,852469) AND subscriber.status = 'active' AND contact.deleted = '0' AND contact.client_id = '2343675427' AND tracking.id IS NULL AND contact.id IN (1,2,3,4,5...);
Segmentation Engine 2.0
This was the first major architectural change to the segmentation engine. Bronto was growing rapidly and clients were putting more load on our databases; we were actively seeking to remove table joins wherever possible from the application. This overhaul was still MySQL based, but instead of a single query, each criterion on the segment was run as a separate query against batches of contacts. This allowed us to further optimize the queries by ensuring that each criterion’s query was using a MySQL index, which we couldn’t ensure before when running monolithic queries. When executing a criterion’s query, we’d use it to update a per-contact bitmap temporary MyISAM table in MySQL. Then, once all the criterion queries finished executing, we’d run a final query against the bitmaps to determine the final result of the boolean logic and thus which contacts were in the segment. Another upside to this architecture is that it allowed clients to calculate a single criterion at a time in the UI and we could keep track of per-criterion contact counts in order to display them in the UI.
Thus, instead of the monolithic queries, we’d run many smaller queries such as:
SELECT contact_id FROM field WHERE field.id = '99278266' AND field.value < '1000' AND contact.id IN (1,2,3,4,5...);
Segmentation Engine 3.0
Eventually Bronto grew to the point that we were dealing with hundreds of millions of contacts and tens of billions of tracking events and other attributes upon which clients wanted to segment. We hit the limits of what we could do to make the MySQL queries run faster, thus we decided to build a completely new segmentation engine from scratch. Instead of translating the rules and criteria that the client builds in the UI into SQL queries, we now translate them into HBase scans to read the necessary data from disk. We then pass this data into an in-house Java-based comparison predicate logic system that determines each contact’s membership by comparing the contact against the value(s) for which the criterion is searching. Depending upon the amount of data the client owns (how many HBase regions worth of contacts they have) we either run the scan in-memory (faster for smaller clients) or we run it as a MapReduce job, which takes more time to spool up but has a higher processing throughput.
The latest version of our segmentation engine has an additional feature: once a segment is saved and executed, the contact membership is kept up-to-date via our realtime event engine. The realtime event engine listens to the firehose of events, be they contact attribute updates, list membership updates, field updates, tracking events, etc. When a new event comes into the system, the engine determines which of a client’s segments might be affected by the event by looking up an index that associates the criterion types on each segment to the event types that affect them. It then re-evaluates the contact associated with the event for each of the associated segments and updates the contact’s membership on the segment if necessary.
To give you a perspective of the complexity increase, Segmentation 2.0 was ~5,000 lines of PHP while Segmentation 3.0 is ~50,000 lines of Java (excluding test code.)
Advantages of the Hadoop-based system:
The MapReduce JobTracker will automatically set the number of mappers to scale with the size of the client’s data (number of HBase regions.) This type of per-client scalability wasn’t possible with MySQL.
MySQL-based segments became noticeably slower as you added more criteria because each criterion would multiply the number of queries that we needed to execute relative to the client’s number of contacts. The Hadoop-based segmentation engine flips the evaluation on its head by pulling all of the data for a single contact one time and evaluating it in the JVM against each criterion on the segment. Thus, more criteria just results in a few more CPU cycles per contact, which is orders of magnitude more efficient.
Also, we can now display a fairly accurate progress bar for calculations by polling our MapReduce JobTracker for its counters to see how many of the client’s contacts have been processed.
We were limited to 32 criteria per segment in the 2.0 engine due to our bitmap size; Hadoop-based segments can support thousands of criteria because we keep track of the criteria counts with Hadoop counters. The ‘counter groups’ are based upon the contact’s status while the counters themselves use the criterion ID as the key with the value being the number of matching contacts of that status.
Challenges of the Hadoop-based system:
Not all criterion types have explicit events that cause contacts to leave the criterion as a member. In particular, this means criteria with relative time frame operators. For example, let’s say we have a segment containing a criterion that finds contacts who have not opened in the past two months. We know that if a contact opens an email, the realtime event system will process the open event and make sure that the contact matches the criterion. However, if there are contacts on the segment because they match this criterion and they do not open an email for the next two months, there will not be an event processed that removes them from this criterion. As a result, we have a separate “pending event” engine that creates these events and sets them to fire in the future at the date that we know we should re-evaluate the contact’s segment membership.
Prioritization of running MapReduce jobs is a bit trickier than prioritizing MySQL-based jobs. We can no longer rely upon on our Operations team’s management of MySQL shards to help distribute resource contention from a hardware standpoint – instead we must do this at a software level. Thus, we have defined a number of fairscheduler pools that are prioritized based upon segment type. We have also built our own job queueing and management system that acts as an intermediary between clients submitting jobs and Hadoop processing them.
Because clients expect segments to stay updated automatically, whenever we find a bug that affects the logic of a given criterion, we need to reprocess all segments that contain a criterion of that type. To handle this, we built a system that will scan through all segments periodically and reprocess those with criteria for which we have recently pushed bug fixes.
Also, due to the architecture of deliverygroups, it would be too performance intensive to keep certain deliverygroup-based criteria up-to-date in realtime. As a result, we have an upkeep job daemon that reprocesses these segments on a nightly basis during periods of low system load.
Bronto Segmentation Engine 3.0 isn’t perfect, but we’re zooming now!