CloudServices/Sagrada/Queuey: Difference between revisions

From MozillaWiki
Jump to navigation Jump to search
No edit summary
No edit summary
Line 169: Line 169:
that are still propagating.
that are still propagating.


**Queuey**
''Queuey''
 
- Deploy to webheads in each data-center
- Deploy to webheads in each data-center


**Storage back-end**
''Storage back-end''
 
- Select Cassandra
- Select Cassandra
- Deploy Cassandra machines (3+) in each data-center
- Deploy Cassandra machines (3+) in each data-center
Line 197: Line 199:
balancer).
balancer).


**Queuey**
''Queuey''
 
- Deploy to desired nodes
- Deploy to desired nodes


**Storage back-end**
''Storage back-end''
 
- Select Cassandra
- Select Cassandra
- Deploy as single-node instance to every queuey node
- Deploy as single-node instance to every queuey node
Line 222: Line 226:
guarantee's.
guarantee's.


**Queuey**
''Queuey''
 
- Deploy to webheads
- Deploy to webheads


**Storage back-end**
''Storage back-end''
 
- Select Cassandra
- Select Cassandra
- Deploy Cassandra cluster (3+) machines
- Deploy Cassandra cluster (3+) machines

Revision as of 21:42, 24 January 2012

Overview

The Message Queue project is part of Project Sagrada, providing a service for applications to queue messages for clients.

Terminology

Cassandra

   Apache Cassandra, the default storage back-end for the queue

CL

   Consistency Level

DC

   Data-center

MQ

   message-queue

qdo

   Worker library for queuey

queuey

   The message queue web application Python package that provides the
   RESTful API for the message queue.

Engineers

  • Ben Bangert
  • Hanno Schlichting

Design Background

An initial search was done of available message queue products to determine if any of them would be appropriate for Sagrada. Generally, most MQ's do not assume message persistence is of great importance, don't have a good RESTful API, and/or aren't built with the assumption that millions of queues will be used at once. Having a configurable MQ that can easily have the various message delivery guarantee's and availability options toggled is quite important, so Queuey is unique in that deployment and configuration drastically alter the actual MQ one works with.

For scalability purposes, since some messages may be retained for periods of time per Notifications requirements and millions of queues will be required, the initial choice for the backend is Cassandra. As Cassandra also provides fast reads with internal caching (like memcached), using it in single node mode for Socorro should also work well.

When used for Notifications, each user will have a single queue per Notification Application, this helps ensure that even for an individual user receiving many messages, they are partitioned by the Notification Application to ensure a more level distribution rather than a single deep queue.

Under the Socorro use-case, it is most likely desired that massive amounts of messages may be intended for the same queue, which would normally result in a single extremely deep queue. Deep queue's do not scale well horizontally, especially when using Cassandra which maximizes through-put across many row-keys.

The other issue is that to consume messages, there are only two effective ways of marking a message as consumed:

1. The message may be deleted after it has been sent. The only problem in this case is that there is still no guarantee it has been processed, and acquiring a lock to consume a message, and the resulting write-back to delete it is expensive.

2. One consumer per queue/partition. This requires some guess-work up-front about how many consumers will be around. Since consumers can consume from multiple queue/partition's at once, there needs to be at least as many partitions for a queue, as desired consumers. The advantage with this approach is that locking is only necessary when adding/removing consumers to ensure one consumer per partition.

Queuey goes with the second option, and only incurs the lock during consumer addition/removal. The MessageQueue also does not track the state or last message read in the queue/partition's, it is the consumers responsibility to track how far it has read and processed successfully. There is an API call available to record with the MessageQueue how far a consumer has successfully processed in a queue/partition.

Architecture

Queue Web Application

   queuey (Python web application providing RESTful API)

Queue Storage Backend

   Cassandra

Coordination Service

   (Only used when consuming from the queue)
   Apache Zookeeper

Queue Consumption

   * Consumers coordinate with coordination service
   * Consumers split queues/partitions amongst themselves
   * Consumers record in coordination service the farthest they've processed
     in a queue/partition
   * Consumers rebalance queue/partition allocation when consumers are added or
     removed using coordination service
   * Consumption and rebalancing is done entirely client-side

Queuey is composed of two core parts:

  • A web application (handles the RESTful API)
  • Storage back-end (used by the web application to store queues/messages)

The storage back-end is pluggable, and the current default storage back-end is Cassandra.

Unlike other MQ products, Queuey does not hand out messages amongst all clients reading from the same queue. Every client is responsible for tracking how far it has read into the queue, if only a single client should see a message, then the queue should be partitioned, and clients should decide who reads which partition of the queue.

Different performance and message guarantee characteristics can be configured by changing the deployment strategy and Cassandra replication and read / write options. Therefore multiple Queuey deployments will be necessary, and Apps should use the deployment with the operational characteristics desired.

Since messages are stored and read by timestamp from Cassandra, and Cassandra only has eventual consistency, there is an enforced delay in how soon a message is available for reading to ensure a consistent picture of the queue. This helps ensure that written messages will show up when reading the queue to avoid 'losing' messages by reading past where they appeared in the queue.

For more on what kind of probabilities are involved in various replication factors and differing read/write CL's, see:

In the event that the clients are deleting messages in their queue as they've been read, the delay is unimportant. Enforcing a proper delay is only required when clients read but never delete messages (and thus track how far into the queue they've read based on time-stamp).

When using queue's that are to be consumed, they must be declared up-front as a partioned queue. The amount of partitions should also be specified, and new messages will be randomly partioned. If messages should be processed in order, they can be inserted into a single partition to enforce ordering. All messages that are randomly partitioned should be considered loosely ordered.

Queue Workers

A worker library called Qdo handles coordinating and processing messages off queues in a job processing setup. The Qdo library utilizes Apache Zookeeper for worker and queue coordination.

Example Deployment Configurations

Multiple DC Reliability

This setup is for messages that are critical to deliver, performance is substantially slower as writes will not return until a quorum of storage nodes in each data-center acknowledge the message was written.

Depending on how urgent it is for the clients to see the message, the read CL can be between ONE and LOCAL_QUORUM. Higher read availability will be achieved by using ONE though, with only a slight delay before a written message will be seen regardless of the data-center.

Using a read CL of ONE allows for an entire data-center to become unavailable and messages will still be delivered, however no new messages can be written as all data-centers must have quorum's available to write messages. Changing the write CL to a lower value adversely affects how clients may read the queue as they may need to track what has been read to ensure they don't miss messages that are still propagating.

Queuey

- Deploy to webheads in each data-center

Storage back-end

- Select Cassandra - Deploy Cassandra machines (3+) in each data-center - Set write CL to EACH_QUORUM - Set read CL between ONE - LOCAL_QUORUM - Set delay as appropriate for the read/write CL's

High Throughput, Occasional Unavailability of Messages, Single DC

In this setup, Queuey behaves like Apache Kafka. The queuey service runs on the same nodes as Cassandra, and each node runs Cassandra as a single node. This provides no data durability beyond the disks, so if message persistence is important than the drives should be RAID. If a node goes down, the messages on that node are unavailable until the node returns.

Applications writing messages should hit a load balancer that has the queuey nodes behind it, so writes will be unaffected by individual machine outages and message producers will not need configuration updates to add/remove queuey nodes.

Clients reading queues will have to know the individual hostnames of the queuey nodes and track how far they've read into the queue per-host, as well as be able to communicate directly with the queuey nodes (bypassing the load balancer).

Queuey

- Deploy to desired nodes

Storage back-end

- Select Cassandra - Deploy as single-node instance to every queuey node - Set read/write CL to ONE, as there is only ONE node - No delay needed

Good Throughput, Available Messages, Single DC

This is a good generic setup that provides message durability in the event a few nodes are lost. Webheads run queuey, and all connect to the same Cassandra cluster.

Message guarantee's can be tweaked by altering the read/write CL's, and the enforced message availability delay. For example, to raise availability the read/write CL can be set to ONE and a delay can be set that will provide a fairly high 99% change of seeing a message within 50ms of writing it (assuming no nodes go down).

See http://www.eecs.berkeley.edu/~pbailis/projects/pbs/#demo for more details on tweaking the delay and CL's to achieve the desired message delivery guarantee's.

Queuey

- Deploy to webheads

Storage back-end

- Select Cassandra - Deploy Cassandra cluster (3+) machines - Set write CL to ONE - QUORUM - Set read CL to ONE - QUORUM - Set delay appropriate for read/write CL's

Initial User Requirements

Notifications

The Notifications project needs Queuey for storing messages on behalf of users that want to receive them. Each user gets their own queue.

If a user needs to get notifications for general topics, the Notifications application will create a queue and clients will poll multiple queues.

The first version could be considered a Message Store rather than a queue as it supports a much richer set of query semantics and does not let public consumers remove messages. Messages can be removed via the entire queue being deleted by the App or by expiring.

Requirements:

  • Service App can create queues
  • Service App can add messages to queue
  • Messages on queues expire
  • Clients may read any queue they are aware of


Socorro

The second version allows authenticated applications to queue messages and for clients to consume them. This model allows for a worker model where jobs are added to a queue that multiple clients may be watching, and each message will be given to an individual client.

Optionally, the client can 'reserve' a message to indicate it would like it, and if the client does not verify that it has successfully handled the message it will be put back in the queue or to a fail/retry queue if desired.

Requirements:

  • Service App can create queues, which can be partitioned
  • Service App can add messages to queue, and specify partition to retain ordering
  • Clients can ask for information about how many partitions a queue has
  • Clients may read any queue, and its partitions that they are aware of

API

Applications allowed to use the Message Queue will be given an application key that must be sent with every request, and their IP must be on an accepted IP list for the given application key.

All methods must include the header 'X-Application-Name' which indicates the name of the application using the queue. Queue's are unique within an application.

For methods requiring authentication (internal apps), the application key must be sent as a HTTP header named 'ApplicationKey'.

Internal Apps

These methods are authenticated by IP, and are intended for use by Services Applications.


POST /queue

   Creates a new queue
   Params:
       queue_name (Optional) - Name of the queue to create
       partitions (Optional) - How many partitions the queue should have (defaults to 1)

DELETE /queue/{queue_name}

   Deletes a given queue created by the App. 
   Params:
       delete (Optional) - If set to false, then the queue contents will be deleted, 
                               but the queue will remain registered.

POST /queue/{queue_name}

   Create a message on the given queue. Contents is expected to be
   a JSON object.
   
   Raises an error if the queue does not exist.

Clients

GET /queue/{queue_name}

   Returns messages for the queue.
   Params:
       since_timestamp (Optional) - All messages newer than this timestamp, should be 
                                        formatted as seconds since epoch in GMT
       limit           (Optional) - Only return N amount of messages, defaults to 100
       order           (Optional) - 'descending' or 'ascending', defaults to descending
   
   Messages are returned in order of newest to oldest.

GET /queue/{queue_name}/info

   Returns information about the queue.
   Example response:
       {
           'partitions': 4,
           'created': 1322521547
       }

Use Cases

Notifications

The new version of Notifications will use the MQ to store messages for users from various websites and allow users to poll the MQ directly for new notifications they have allowed.

  • Internal app add's messages to a queue by queue name
  • Internal app can delete queues by name
  • Devices can read a queue by name
  • Devices can read the X most recent messages
  • Devices can read messages since XXX
  • Messages are encoded as JSON

Socorro

Socorro is Mozilla's crash reporting service, receiving in excess of 8 thousand crashes per second during peaks. The MQ will be utilized to hold crash reports as they come in, and used by consumers to process the crash reports.

  • Application Key and Application Queue's will be created during setup.
  • Webheads will be adding messages to the queue by queue name, with enough parititons
 setup for the various consumers.
  • Consumers will process crashes off the queues