Distributed Message Queue
Topics:
Producer delivery ack mechanism: at least, at most, exactly once.
Consumer receipt: at least, at most, exactly once.
Ordering guarantee.
Retries?
Consumer rebalancing?
Topics? Partitions? Brokers? Replicas?
Message consumption: push or pull?
Functional Requirement
Able to send task to message queue and get immediate response. (producer)
Able to subscribe and fetch task from message queue. (consumer)
Support topic? Yes so message can be consumed repeatedly.
Support ordering? Yes
Delivery guarantee - Messages can be consumed repeatedly or only once.
at-least once, at-most once or exactly once, configurable.
Message size? kb range.
Non-functional Requirement
Highly available
Low latency
Durable, message should not be missed.
Data should be persisted on disk and replicated on multiple nodes.
API
Data Schema
Read/Write pattern:
Write-heavy, read-heavy
No update or delete operations.
Sequential read/write access.
Can't support both write-heavy and read-heavy access pattern.
WAL has pure sequential read/write access pattern.
Disk performance of sequential access is very good.
For WAL, a file cannot grow infinitely, we need segments. New messages are appended only to active segment file. When it reaches to certain size, we create a new active segment to accept writes. Non-active segments only serve read requests.
key
byte[]
used to determine the partition of the message
value
byte[]
payload of a message
topic
string
the name of the topic
partition
integer
ID of the partition
offset
long
the position of the message in the partition. We can find a message using: topic, partition, offset
timestamp
long
timestamp
size
integer
size of message
High Level Diagram
Scale message queue -> cluster of brokers coordinated by zookeeper for leader election.
Consumer -> consumer groups for better read throughput.
Components
Producer: pushes messages to specific topics.
Consumer group: subscribes to topics and consumes messages.
Broker: holds multiple partitions. A partition holds a subset of messages for a topic.
Storage:
Data storage: messages are persisted in data storage in partitions.
State storage: manages consumer states
The mapping between partitions and consumers.
The last consumed offset of consumer groups for each partition.
Metadata storage: persists configuration and properties of topics.
The number of partitions.
Retention period
Distribution of replicas.
Coordination service:
Service discovery: which brokers are alive.
Leader election: one of the brokers is selected as active controller responsible for assigning partitions.
Apache ZooKeeper or etcd are commonly used for electing a controller.
E2E
Producer
Producer sends messages to the routing layer.
The routing layer reads the replica distribution plan from metadata storage and caches it locally. When a message arrives, it routes the message to leader replica of that topic/partition.
The leader replica receives message and follower replicas pull data from leader.
When enough replicas have synced, the leader commits the data, the message is ready to be consumed, and it respondes to producer.
Consumer
A new consumer wants to join group-1 and subscribes to Topic-A, finding the coordinator broker of group-1.
The coordinator confirms that consumer has joined and assign partition to consumer.
Consumer fetches messages from last consumed offset, managed by state storage.
Consumer processes messages and commits offset back to broker.
Delivery semantics: order of data processing offset commiting
Deep Dive
Routing Layer
Producer send message to routing layer. The routing layer reads replica distribution plan and cache it. The routing layer route message to broker leader.
Buffer component is introduced in producer to batch messages in memory and sends out larger batches in a single request, increase throughput.
No network latency and additional network hops.
Cannot do batching.
Trade off around batch size:
batch size large, throughput large, latency high
batch size small, throughput small, latency lower
Consumer Delivery - Push vs Pull
from broker to consumer
Low latency - broker can push message instantly after receiving.
Consumption rate fall below producer rate, consumer will get overwhelmed
Consumer controls consumption rate. If the rate of consumption falls below producer rate, we can scale up.
Message queue use pull model.
Consumer Rebalancing
Producer Ack
In-sync replicas
Ack 0
Fire and forget.
Producer keep sending messages to leader without waiting for acks.
Low latency with possible data loss.
Great use case for collecting metrics, logging data.
Ack 1
Producer wait for leader broker to persist the message.
Leader might fail immediately after sending ack so still potential data loss.
Ack all
Producer wait for all in-sync replicas to persist the message.
Provide strong message durability but latency is high almoist 2.5 times the original latency.
If topic is hot, we can add more partitions to reduce connection load to leader replica
Reading from leader replica might not be the best option. We can enable consumer to subscribe to closest ISR.
ISR is determined by topic configuration.
replica.lag.max.messages = 4 meaning the follower is a ISR as long as it's lagged not more than 3 messages than leader.
Consumer Receipt (Delivery)
At most once
Producer ack = 0, fire and forget
Consumer commit before process data.
At least once
Producer ack = 1 or ack = all
Consumer commit after processing data. Same data maybe processed twice.
Exactly once (payment, trading, accounting)
Ack = all
idempotency key for message data.
Use transaction coordinator to track pending status + abort marker if something goes wrong.
two phase commit.
Questions
Last updated