CloudServices/Sagrada/Queuey
Overview
The Message Queue project is part of Project Sagrada, providing a service for applications to queue messages for clients.
Project
Engineers
- Ben Bangert
- Hanno Schlichting
User Requirements
Phase 1
NOTE: This phase is being somewhat skipped at the moment to Phase 2
The first version of the MQ will focus on providing a useful service for the Notifications project to enqueue messages for many queues and for clients to poll for messages on a given queue.
No message aggregation is done in this version, if a client needs to get messages for more than one subscription it must poll each queue and aggregate them.
The first version could be considered a Message Store rather than a queue as it supports a much richer set of query semantics and does not let public consumers remove messages. Messages can be removed via the entire queue being deleted by the App or by expiring.
Requirements:
- Service App can create queues
- Service App can add messages to queue
- Messages on queues expire
- Clients may read any queue they are aware of
Phase 2
The second version allows authenticated applications to queue messages and for clients to consume them. This model allows for a worker model where jobs are added to a queue that multiple clients may be watching, and each message will be given to an individual client.
Optionally, the client can 'reserve' a message to indicate it would like it, and if the client does not verify that it has successfully handled the message it will be put back in the queue or to a fail/retry queue if desired.
Requirements:
- Service App can create queues, which can be partitioned
- Service App can add messages to queue, and specify partition to retain ordering
- Clients can ask for information about how many partitions a queue has
- Clients may read any queue, and its partitions that they are aware of
Design Background
For scalability purposes, since some messages may be retained for periods of time per Phase 1 requirements, the initial choice of a backend is Cassandra. However, for Phase 2 requirements, Cassandra is missing the ability to manage and coordinate queue consumers so Zookeeper is being used for distributed synchronization.
When used for Notifications, each user will have a single queue per Notification Application, this helps ensure that even for an individual user receiving many messages, they are partitioned by the Notification Application to ensure a more level distribution rather than a single deep queue.
Under the Phase 2 use-case, it is most likely desired that massive amounts of messages may be intended for the same queue, which would normally result in a single extremely deep queue. Deep queue's do not scale well horizontally, especially when using Cassandra which maximizes through-put across many row-keys.
The other issue is that to consume messages, there are only two effective ways of marking a message as consumed:
1. The message may be deleted after it has been sent. The only problem in this case is that there is still no guarantee it has been processed, and acquiring a lock to consume a message, and the resulting write-back to delete it is expensive.
2. One consumer per queue/partition. This requires some guess-work up-front about how many consumers will be around. Since consumers can consume from multiple queue/partition's at once, there needs to be at least as many partitions for a queue, as desired consumers. The advantage with this approach is that locking is only necessary when adding/removing consumers to ensure one consumer per partition.
This MessageQueue project goes with the second option, and only incurs the lock during consumer addition/removal. The MessageQueue also does not track the state or last message read in the queue/partition's, it is the consumers responsibility to track how far it has read and processed successfully. There is an API call available to record with the MessageQueue how far a consumer has successfully processed in a queue/partition.
When reading messages for a processing workload, they should be read in batches for performance to avoid network latency overhead.
Since messages are stored and read by timestamp from Cassandra, and Cassandra only has eventual consistency, there is an enforced delay in how soon a message is availalable for reading to ensure a consistent picture of the queue. This is no less than 5 seconds after insertion, and at most about 15 seconds.
When using queue's that are to be consumed, they must be declared up-front as a partioned queue. The amount of partitions should also be specified, and new messages will be randomly partioned. If messages should be processed in order, they can be inserted into a single partition to enforce ordering. All messages that are randomly partitioned should be considered loosely ordered.
Architecture
Queue Storage Backend
Cassandra
Coordination Service
(Only used when consuming from the queue)
Apache Zookeeper
Queue Consumption
* Consumers coordinate with coordination service * Consumers split queues/partitions amongst themselves * Consumers record in coordination service the farthest they've processed in a queue/partition * Consumers rebalance queue/partition allocation when consumers are added or removed using coordination service * Consumption and rebalancing is done entirely client-side
Proposed API
Applications allowed to use the Message Queue will be given an application key that must be sent with every request, and their IP must be on an accepted IP list for the given application key.
All methods must include the header 'X-Application-Name' which indicates the name of the application using the queue. Queue's are unique within an application.
For methods requiring authentication (internal apps), the application key must be sent as a HTTP header named 'ApplicationKey'.
Internal Apps
These methods are authenticated by IP, and are intended for use by Services Applications.
POST /queue
Creates a new queue
Params: queue_name (Optional) - Name of the queue to create partitions (Optional) - How many partitions the queue should have (defaults to 1)
DELETE /queue/{queue_name}
Deletes a given queue created by the App.
Params: delete (Optional) - If set to false, then the queue contents will be deleted, but the queue will remain registered.
POST /queue/{queue_name}
Create a message on the given queue. Contents is expected to be a JSON object. Raises an error if the queue does not exist.
Clients
GET /queue/{queue_name}
Returns messages for the queue.
Params: since_timestamp (Optional) - All messages newer than this timestamp, should be formatted as seconds since epoch in GMT limit (Optional) - Only return N amount of messages, defaults to 100 order (Optional) - 'descending' or 'ascending', defaults to descending Messages are returned in order of newest to oldest.
GET /queue/{queue_name}/info
Returns information about the queue.
Example response: { 'partitions': 4, 'created': 1322521547 }
Use Cases
Notifications
The new version of Notifications will use the MQ to store messages for users from various websites and allow users to poll the MQ directly for new notifications they have allowed.
- Internal app add's messages to a queue by queue name
- Internal app can delete queues by name
- Devices can read a queue by name
- Devices can read the X most recent messages
- Devices can read messages since XXX
- Messages are encoded as JSON
Socorro
Socorro is Mozilla's crash reporting service, receiving in excess of 8 thousand crashes per second during peaks. The MQ will be utilized to hold crash reports as they come in, and used by consumers to process the crash reports.
- Application Key and Application Queue's will be created during setup.
- Webheads will be adding messages to the queue by queue name, with enough parititons
setup for the various consumers.
- Consumers will process crashes off the queues