# Photon: Fault-tolerant and Scalable Joining of Continuous Data Streams - Google, 2013

• [Paper] [Mirror]

• Joining multiple streams with exactly once processing
• Joins are distributed geographically for fault tolerance
• Not for general computation like MillWheel

• Originally designed for joining search query data with ad click data
• These are two separate streams that do not have a guaranteed global order
• It’s not unusual to have one servers clicks be severely delayed
• The streams come from many front end servers via logs
• It’s not uncommon for one stream to be delayed
• They didn’t want to send the query data with the URL, just an ID
• Giant URLs
• Have to trust the user data
• They can never charge an advertiser twice, but they want to make sure they join all of the results eventually, leading to:
• at most-once at any time
• near-exact in real-time
• exactly-once eventually
• They achieve fault tolerance partially but running the join in multiple DCs
• They use a system called the IdRegistry which stops multiple outputs of the join in a read-modify-write transaction
• Each cluster retries the join until their is a success
• IdRegistry
• KV store, PaxosDB
• Before an output is written, the node attempts to commit in IdRegisty using the join key
• If the record already exists, it skips it, assuming it has been joined successfully
• While the IdRegisty commit is atomic, the node could still fail after writing the value
• There is a system that periodically checks and re-injects events that were not present in the output but present in the IdRegisty
• For performance, the node checks the IdRegistry before performing the join lookup, but confirms at the end with the atomic transaction
• Since the IdRegisty is located in geographically distance DCs, there can only be a few transactions per second (less than 10) as the latency between DCs is high
• They batch many transactions into one Paxos transaction
• They make sure two transactions don’t conflict in one batch at the application level
• They also shard the IdRegisty
• The partitioning can be adjusted dynamically
• They can put a new map in PaxosDB which is effective from a specified timestamp onwards
• Dispatcher and Joiner
• The dispatcher reads the click logs and sends data the joiner to joined (after checking the IdRegistry)
• The joiner is stateless, it looks up the query data in the event store
• Initially, it’s not clear why these need to be separate processes
• I believe they did this so they could scale the components individually
• The dispatcher stores status of the log files it’s reading in GFS
• This allows for worker restart
• Sometimes the query data may not be in the EventRegistry yet, so the dispatcher is responsible for periodically retrying
• Exponential backoff
• Sometimes the joiner may send a transaction to IdRegisty, but fail to receive the ack
• The value stored for the join key in the IdRegistry is an ID for the joiner
• If the joiner retries, the IdRegistry will see that it’s from the same joiner and just ack the request (without actually changing anything)
• EventStore
• Stores the query log side of the join
• CacheEventStore
• In-memory store
• It’s not perfect, but misses go to the LogsEventStore
• LogsEventStore
• While there is no global order for the data in the logs, we can approximate the position
• A reader runs through the logs saving periodically saving the timestamp of an event to a map
• This is done as part of filling the cache
• This mapping is stored in BigTable with the key (ServerIP:ProcessID:Timestamp)
• This assumes events are ordered per pid
• When the store wants to read a query, it consults the mapping to find a place to start a range scan on BigTable
• Design Lessons
• Minimizing critical state in PaxosDB was important
• IdRegistry can easily become a bottleneck
• I wonder if instead of actively querying the IdRegistry for the first check if they could instead read a delayed log of the transactions
• However that means they are basically acting as read-only delayed replicas
• My guess is most of the load is actually in the Paxos latency