Top K / Ad Click Aggregation
https://www.uber.com/blog/real-time-exactly-once-ad-event-processing/
Topics:
DB Choice: Pinot or Cassandra or BigData infra.
Raw data vs aggregated data?
Exact-once processing
Batch vs streaming
Kappa vs Lambda
How to manage aggregation window?
How to do recalculation and reconciliation?
Q:
How are those ad click events stored and fetched?
Log file located in different servers and events are append to the end of the file.
How much is the scale of the system? 1B ad clicks per day, 2M ads in total. Number of ad click events grows 30% year over year.
How often do we update it and how real time do we want the ad click report to be? Say an ad click is made 5 mins ago, how soon we want it to reflect in the aggregation? A few minutes of end-to-end latency for ad click aggregation. Real-time bidding less than a second.
Functional Requirements:
Aggregate the number of clicks of ad_id in the last M minutes.
Return the top 100 most clicked ad_id every minute.
Support aggregation filtering by different attributes.
Non-functional Requirements
Highly available
Highly scalable
Low latency, real-time experience
Reliability - event cannot be lost.
Accuracy: exactly-once or idempotency.
Properly handle delayed or duplicate events.
Scale
1B users, 2 clicks a day = 2B clicks a day.
10^9*2 / 10^5 = 2*10^4 = 20000 QPS.
Peak ad click QPS = 20000 * 5 = 100000 QPS
Assume a single ad click event occupies 0.1KB storage. Daily storage requirement is 0.1KB * 1B = 100GB
API
Aggregate the number of clicks of ad_id in the last M minutes
Return top N most clicked ad_ids in the last M minutes
Data Schema
Raw data
Aggregated data
Support ad filtering we can add additional filter_id to above table.
0012
US
0012
*
Most clicked ads
The aggregation window size in mins
update time minute
most_clicked_ads:
Raw data vs Aggregated data
Full Data set
Support data filter and recalculation.
Data loss, this is derived data. 10 entries might aggregated to 1 entry.
Huge data storage
Slow query
Smaller data set
Fast query
We store both.
It's a good idea to keep raw data. If something goes wrong, we could use the raw data for debugging. If the aggregated data is corrupted due to a bad bug, we can recalculate the aggregated data from raw data.
Aggregated data should be stored as well. The raw data set is huge, the large size makes querying raw data very inefficient.
We can also move old data to cold storage to reduce cost.
DB Choice
Write load is heavy so use NoSQL like Cassandra or time series DB like influxDB
We can also consider to store ORC, parquet in S3/GCS and add hive metastore catalog on top of it. We can use query engine like Presto/BigQuery to query the result.
High Level Design
E2E
Log Watcher send logs to Kafka.
DB Writer pull logs from Kafka and store raw data to DB.
Aggregation service pulls commit offset from Kafka with micro-batch data.
Aggregation service aggregates the ad count using Flink.
Aggregation service fetches counter from Cassandra DB.
Aggregation service add the counter and update DB with latest result.
Aggregation service commit offset back to Kafka.
Deep Dive
How to make sure aggregated data are atomically committed?
Why do we need atomically committed?
If step 3 / 4 failed, the offset is not committed successfully back to Kafka, we would end up processing the same batch multiple times, leading over-counting data.
Solution, we store the Kafka offset as version for every Kafka partition within the DB, essentially making this process idempotent.
Streaming vs Batching
Responsiveness
Respond to client quickly
No response
No Response
Input
User requests
Bounded input with finite size.
Infinite streams
Output
Response
Aggregated Metrics / Materialized View
Aggregated Metrics / Materialized View
Performance measurement
Availability, latency
Throughput
Throughput, latency
Example
Online shopping
MapReduce
Flink
Lambda vs Kappa
Lambda: A system that contains two processing paths (batch and streaming) simultaneously.
Kappa: A system that combines the batch and streaming in one processing path, the key idea is to handle both real-time data processing and continuous data reprocessing using a single stream processing engine. The difference is using a different input, input stream vs static raw data.
Scalability
Scale independently
Scale challenge when reprocessing large amounts of data.
Overhead
Operational overhead
No operational overhead
Data recalculation
Time
Event time: when an ad click happens on client.
Processing time: system time of aggregation server that processes the click event.
Event time
More accurate because the client knows exactly when an ad is clicked
It depends on timestamp generated on client-side. Clients might have the wrong time or generated by malicious users. Have to deal with delayed events.
Processing time
More reliable
Not accurate if event reaches system at much later time.
Use both event time and processing time for more accurate time.
To adjust incorrect device clocks, one approach is to log three timestamps:
The time at which the event occurred, according to the device lock.
The time at which the event was sent to the server, according to device clock.
The time at which the event was received by the server, according to server clock.
offset = 3-2
real time = 1+offset
How we deal with delayed events?
Ignore straggler events. probably small percentage but can use metrics to track.
Publish a correction, an updated value for window with stragglers included.
"watermark" method for an extended of aggregation window. The value of watermark depends on business requirement. Longer is more accurate but long latency.
Aggregation Window
Tumbling window
Fixed length. If you have a 1-minute tumbling window, all events between 10:03:00 and 10:03:59 will be grouped in one window, next window would be 10:04:00-10:04:59
Sliding window
Events that occur within some interval of each other. For example, a 5-minute sliding window would cover 10:03:39 and 10:08:12 because they are less than 4 minutes apart.
Hopping window
Fixed length, but allows windows to overlap in order to provide some smoothing. If you have a 5-minute window with a hop size of 1 minute, it would contain the events between 10:03:00 and 10:07:59, next window would cover 10:04:00-10:08:59
Session window
No fixed duration. All events for the same user, the window ends when the user has been inactive for some time (30 minutes). Common in website analytics
Tumbling window and sliding window is relevant in our problem.
What's the diff between hopping window and sliding window?
Hopping window uses a fixed window but sliding window only pop expired events out when they are far than the window designed.
How to scale Kafka?
producer: don't limit the number of producer instances, this can be easily scaled.
consumer: rebalancing mechanism helps to scale consumers by adding or removing nodes.
Rebalancing can be slow, recommend to do during off-peak hours.
Hashing Key Using ad_id as hashing key for Kafka partition. Same ad on same partition.
Number of partition Need to pre-allocate enough partitions in advance. If the partition changes, same ads maybe mapped into different partition.
Topics sharding Shard the data by geography (topic_north_america, topic_eu, topic_asia etc) or by business type (topic_web_ads, topic_mobile_ads, etc)
Pros: more throughput, Cons: more complexity
How to make sure downstream consumers also have exact-once guarantee?
We can introduce additional Kafka layer to publish on topics.
Producer: ack = all or ack = 1
Consumer: read_committed for transaction
Use UUID for idempotency key.
Appendix:
Last updated