Reliable Queueing in Redis (Part 1)

This post is the first in a series highlighting Bronto’s use of Redis for reliable queueing. We’re working now to open-source this work, so please check back soon!

Bronto & Redis

Here in Bronto Engineering, we sure do love us some Redis. And why not? It is fast, flexible, and has a rock solid track record in our production environment. It isn’t surprising that when we were looking for a better implementation for our distributed work queues, we decided to take a look at what Redis could provide.

The Need For Reliable Queueing

The asynchronous, distributed processing of work queues is a common pattern within our services. Most of the major features of our platform (e.g. automation workflows, real-time segmentation) rely on these queues, which help us to dynamically scale our processing capacity, and act as a protective buffer during times of high demand.

Reliable queues add an additional guarantee: once an item has been added to the queue, it will not be permanently removed until that item has been confirmed as being fully processed. In most implementations, this means that the consumer is going to interact with the queue at least twice, once to pop a piece of work from the queue, and again to mark that piece of work as complete.

Consider what might happen when you are using a queue without a reliability guarantee:

A producer pushes a piece of work into the queue.

Queue Fail (1)

A consumer pops the piece of work from the queue.

Queue Fail (2)

The consumer dies prior to completing processing, probably losing the work forever.

Queue Fail (3)

Reliable queues address this risk by keeping a copy of the original item in the queue. After some period of inactivity, that item will be recovered and requeued, making it available for another consumer to process. The result is an ‘at least once’ delivery guarantee, one common to many reliable, distributed systems.

Queue Fail (4)

Reliable Queueing in Redis

An efficient, reliable queueing mechanism is a great building block for a distributed system, but how best to get this behavior from Redis?

We started by searching around for any existing implementations, but didn’t find anything that was truly a drop-in component. We did find a brief mention of the pattern on the documentation page for the RPOPLPUSH command, but its approach didn’t meet our needs.

[Note: Since our original implementation, a number of posts highlighting similar techniques have popped up. It appears we weren’t the only ones interested in using Redis in this way.]

At a minimum, the features we needed were:

  • Java API
  • High throughput
  • Strong (but not necessarily perfect) durability
  • Stable performance at all queue sizes
  • Single and batch variants of all major queue operations
    • Enqueue (for adding work)
    • Dequeue (for popping work)
    • Release (for permanent release of completed work)
    • Requeue (for explicit requeue of failed work)
    • Extend (for recovery period extensions on long running work)
    • Defer (for per-item deferment of work)
  • Stats (for per-item stats on enqueue time, dequeue/requeue counts, etc)

Data Structures

Our implementation takes a basic Redis List, and enhances it with a number of additional data structures. Let’s take a closer look at those structures, and how they are used.

Reliable Queue Structures

UUIDs as Surrogate Keys

Our strategy spreads information about the state of an item in the queue across a number of Redis data structures, requiring the use of a per-item surrogate key to tie them together. The UUID is a good choice here because 1) they are quick to generate, and 2) can be generated by the clients in a decentralized manner.

Pending List

The Pending List holds the generated UUIDs for the items that have been enqueued(), and are ready to be processed. It is a Redis List, presenting the generated UUIDs in FIFO order.

Values Hash

The Values Hash holds the actual items that have been enqueued. It is a Redis Hash, mapping the generated UUID to the binary form of the the item. This is the only representation of the original item that will appear in any of the data structures.

Stats Hash

The Stats Hash records some timestamps and counts for each of the items. Specifically:

  • enqueue time
  • last dequeue time
  • dequeue count
  • last requeue time
  • last requeue count.

It is a Redis Hash, mapping the generated UUID to a custom data structure that holds this data in a packed, binary form.

Why keep stats on a per-item basis? We find it really useful for debugging (e.g. do we have a bad apple item that is being continuously requeued?), and for understanding how far behind we are if queues start to back up. Furthermore, the cost is only ~40 bytes per item, much smaller than our typical queued items.

Working Set

The Working Set holds the set of UUIDs that have been dequeued(), and are currently being processed. It is a Redis Sorted Set, and scores each of the UUIDs by a pre-calculated, millisecond timestamp. Any object that has exceeded its assigned timeout is considered abandoned, and is available to be reclaimed.

Delay Set

The Delay Set holds the set of UUIDs that have been requeued() with a per-item deferment. It is a Redis Sorted Set, and scores each of the UUIDs by a pre-calculated, millisecond timestamp. Once the deferment timestamp has expired, the item will be returned to the Pending List.

Why support per-item deferment? We have a number of use cases where we might want to backoff specific pieces of work — maybe an underlying resource is too busy — without backing off the entire queue.  Per-item deferment lets us say, “requeue this item, but don’t make it available for dequeue for another n seconds.”

Coming Soon

If this topic interests you, please be sure to check back. Future posts will cover:

  • Using Redis LUA support to
    • …make our queue operations atomic
    • …make our batch operations happen in a single round-trip
  • Using producer-side buffering to increase throughput by aggregating single enqueue() operations into more efficient batch operations.
  • Using consumer-side buffering to present batched dequeue() operations as easier to manage, single item dequeue() operations.
  • Using asynchronous buffering of release(), requeue(), defer(), and extend() operations to further increase throughput.

And remember, we are planning an open-source release of this library in the near future. We are excited to share our work with the rest of the Redis community.
Thanks for reading, and keep an eye out for our follow-up posts!

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s