|
|
Line 1: |
Line 1: |
| = Overview =
| |
|
| |
|
| The '''Message Queue''' project is part of [[Services/Sagrada|Project Sagrada]], providing a service for applications to queue messages for clients.
| |
|
| |
| = Project =
| |
|
| |
| == Engineers ==
| |
|
| |
| * Ben Bangert
| |
| * Hanno Schlichting
| |
|
| |
| == User Requirements ==
| |
|
| |
| === Phase 1 ===
| |
|
| |
| '''NOTE: This phase is being somewhat skipped at the moment to Phase 2'''
| |
|
| |
| The first version of the MQ will focus on providing a useful service for the
| |
| [[Services/Notifications|Notifications project]] to enqueue messages for many
| |
| queues and for clients to poll for messages on a given queue.
| |
|
| |
| No message aggregation is done in this version, if a client needs to get
| |
| messages for more than one subscription it must poll each queue and aggregate
| |
| them.
| |
|
| |
| The first version could be considered a '''Message Store''' rather than a
| |
| ''queue'' as it supports a much richer set of query semantics and does not let
| |
| public consumers remove messages. Messages can be removed via the entire queue
| |
| being deleted by the App or by expiring.
| |
|
| |
| Requirements:
| |
|
| |
| * Service App can create queues
| |
| * Service App can add messages to queue
| |
| * Messages on queues expire
| |
| * Clients may read any queue they are aware of
| |
|
| |
|
| |
| === Phase 2 ===
| |
|
| |
| The second version allows authenticated applications to queue messages and for
| |
| clients to consume them. This model allows for a worker model where jobs are
| |
| added to a queue that multiple clients may be watching, and each message
| |
| will be given to an individual client.
| |
|
| |
| Optionally, the client can 'reserve' a message to indicate it would like it,
| |
| and if the client does not verify that it has successfully handled the message
| |
| it will be put back in the queue or to a fail/retry queue if desired.
| |
|
| |
| Requirements:
| |
|
| |
| * Service App can create queues, which can be partitioned
| |
| * Service App can add messages to queue, and specify partition to retain ordering
| |
| * Clients can ask for information about how many partitions a queue has
| |
| * Clients may read any queue, and its partitions that they are aware of
| |
|
| |
| == Design Background ==
| |
|
| |
| For scalability purposes, since some messages may be retained for periods of time
| |
| per Phase 1 requirements, the initial choice of a backend is Cassandra. However,
| |
| for Phase 2 requirements, Cassandra is missing the ability to manage and coordinate
| |
| queue consumers so Zookeeper is being used for distributed synchronization.
| |
|
| |
| When used for Notifications, each user will have a single queue per Notification
| |
| Application, this helps ensure that even for an individual user receiving many
| |
| messages, they are partitioned by the Notification Application to ensure
| |
| a more level distribution rather than a single deep queue.
| |
|
| |
| Under the Phase 2 use-case, it is most likely desired that massive amounts of
| |
| messages may be intended for the same queue, which would normally result in a
| |
| single extremely deep queue. Deep queue's do not scale well horizontally,
| |
| especially when using Cassandra which maximizes through-put across many
| |
| row-keys.
| |
|
| |
| The other issue is that to consume messages, there are only two effective ways
| |
| of marking a message as consumed:
| |
|
| |
| 1. The message may be deleted after it has been sent. The only problem in
| |
| this case is that there is still no guarantee it has been processed, and
| |
| acquiring a lock to consume a message, and the resulting write-back to
| |
| delete it is expensive.
| |
|
| |
| 2. One consumer per queue/partition. This requires some guess-work up-front
| |
| about how many consumers will be around. Since consumers can consume from
| |
| multiple queue/partition's at once, there needs to be at least as many
| |
| partitions for a queue, as desired consumers. The advantage with this
| |
| approach is that locking is only necessary when adding/removing consumers
| |
| to ensure one consumer per partition.
| |
|
| |
| This MessageQueue project goes with the second option, and only incurs the
| |
| lock during consumer addition/removal. The MessageQueue also does not track
| |
| the state or last message read in the queue/partition's, it is the consumers
| |
| responsibility to track how far it has read and processed successfully. There
| |
| is an API call available to record with the MessageQueue how far a consumer
| |
| has successfully processed in a queue/partition.
| |
|
| |
| When reading messages for a processing workload, they should be read in batches
| |
| for performance to avoid network latency overhead.
| |
|
| |
| Since messages are stored and read by timestamp from Cassandra, and Cassandra
| |
| only has eventual consistency, there is an enforced delay in how soon a message
| |
| is availalable for reading to ensure a consistent picture of the queue. This
| |
| is no less than 5 seconds after insertion, and at most about 15 seconds.
| |
|
| |
| When using queue's that are to be consumed, they must be declared up-front as
| |
| a ''partioned'' queue. The amount of partitions should also be specified, and
| |
| new messages will be randomly partioned. If messages should be processed in
| |
| order, they can be inserted into a single partition to enforce ordering. All
| |
| messages that are randomly partitioned should be considered loosely ordered.
| |
|
| |
| == Architecture ==
| |
|
| |
| '''Queue Storage Backend'''
| |
|
| |
| Cassandra
| |
|
| |
| '''Coordination Service'''
| |
|
| |
| (Only used when consuming from the queue)
| |
|
| |
| Apache Zookeeper
| |
|
| |
| '''Queue Consumption'''
| |
|
| |
| * Consumers coordinate with coordination service
| |
| * Consumers split queues/partitions amongst themselves
| |
| * Consumers record in coordination service the farthest they've processed
| |
| in a queue/partition
| |
| * Consumers rebalance queue/partition allocation when consumers are added or
| |
| removed using coordination service
| |
| * Consumption and rebalancing is done entirely client-side
| |
|
| |
| == Proposed API ==
| |
|
| |
| Applications allowed to use the '''Message Queue'''
| |
| will be given an application key that must be sent with every request, and
| |
| their IP must be on an accepted IP list for the given application key.
| |
|
| |
| All methods must include the header 'X-Application-Name' which indicates the name of the
| |
| application using the queue. Queue's are unique within an application.
| |
|
| |
| For methods requiring authentication (internal apps), the application key must
| |
| be sent as a HTTP header named 'ApplicationKey'.
| |
|
| |
| === Internal Apps ===
| |
|
| |
| These methods are authenticated by IP, and are intended for use by Services Applications.
| |
|
| |
|
| |
| '''POST /queue'''
| |
|
| |
| Creates a new queue
| |
|
| |
| Params:
| |
| ''queue_name'' (Optional) - Name of the queue to create
| |
| ''partitions'' (Optional) - How many partitions the queue should have (defaults to 1)
| |
|
| |
| '''DELETE /queue/{queue_name}'''
| |
|
| |
| Deletes a given queue created by the App.
| |
|
| |
| Params:
| |
| ''delete'' (Optional) - If set to ''false'', then the queue contents will be deleted,
| |
| but the queue will remain registered.
| |
|
| |
| '''POST /queue/{queue_name}'''
| |
|
| |
| Create a message on the given queue. Contents is expected to be
| |
| a JSON object.
| |
|
| |
| Raises an error if the queue does not exist.
| |
|
| |
| === Clients ===
| |
|
| |
| '''GET /queue/{queue_name}'''
| |
|
| |
| Returns messages for the queue.
| |
|
| |
| Params:
| |
| ''since_timestamp'' (Optional) - All messages newer than this timestamp, should be
| |
| formatted as seconds since epoch in GMT
| |
| ''limit'' (Optional) - Only return N amount of messages, defaults to 100
| |
| ''order'' (Optional) - 'descending' or 'ascending', defaults to descending
| |
|
| |
| Messages are returned in order of newest to oldest.
| |
|
| |
| '''GET /queue/{queue_name}/info'''
| |
|
| |
| Returns information about the queue.
| |
|
| |
| Example response:
| |
| {
| |
| 'partitions': 4,
| |
| 'created': 1322521547
| |
| }
| |
|
| |
| == Use Cases ==
| |
|
| |
| === Notifications ===
| |
|
| |
| The new version of [[Services/Notifications/Push|Notifications]] will use the
| |
| MQ to store messages for users from various websites and allow users to poll
| |
| the MQ directly for new notifications they have allowed.
| |
|
| |
| * Internal app add's messages to a queue by queue name
| |
| * Internal app can delete queues by name
| |
| * Devices can read a queue by name
| |
| * Devices can read the X most recent messages
| |
| * Devices can read messages since XXX
| |
| * Messages are encoded as JSON
| |
|
| |
| === Socorro ===
| |
|
| |
| Socorro is Mozilla's crash reporting service, receiving in excess of 8 thousand
| |
| crashes per second during peaks. The MQ will be utilized to hold crash reports as
| |
| they come in, and used by consumers to process the crash reports.
| |
|
| |
| * Application Key and Application Queue's will be created during setup.
| |
| * Webheads will be adding messages to the queue by queue name, with enough parititons
| |
| setup for the various consumers.
| |
| * Consumers will process crashes off the queues
| |